## Kafka Consumer

Here,the kafka consumer is reading from the topic incidents,to which the producer was pushing the data.Upon consumption,the data will then be put in a new Directory newDir as csv files.

In [3]:
from confluent_kafka import Consumer, KafkaError
import csv
import os

# Kafka consumer configuration
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'gp1',
    'auto.offset.reset': 'earliest'
}

# Create Kafka consumer
consumer = Consumer(conf)

# Subscribe to the topic
topic = 'incidents'
consumer.subscribe([topic])

# Output directory for writing batch files
output_directory = 'newDir/'

# Create the output directory if it doesn't exist
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

batch_count = 0

try:
    while True:
        # Poll for new messages
        batch = consumer.consume(num_messages=100, timeout=1.0)  # Adjust batch size and timeout as needed
        if not batch:
            continue
        
        # Increment batch count
        batch_count += 1
        
        # Create a new file for this batch
        batch_file_path = os.path.join(output_directory, f'batch_{batch_count}.csv')
        with open(batch_file_path, 'a', newline='') as batch_file:
            csv_writer = csv.writer(batch_file)
            
            for msg in batch:
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition
                        continue
                    else:
                        # Error
                        print(msg.error())
                        break
                
                # Decode message value from bytes to string and split by semicolon to get rows
                row = msg.value().decode('utf-8').split(';')
                
                # Write row to CSV file
                csv_writer.writerow(row)
        
        print(f"Batch {batch_count} written to {batch_file_path}")

except KeyboardInterrupt:
    pass

finally:
    # Close Kafka consumer
    consumer.close()
