# Spark Kafka Consumer
This notebook implements a Kafka consumer that processes messages using Spark.

In [None]:
from confluent_kafka import Consumer, KafkaError
import json
import pyspark
from pyspark.sql import SparkSession

In [None]:
class SparkKafkaConsumer:
    def __init__(self, bootstrap_servers, topic, group_id='spark-consumer-group'):
        self.consumer_config = {
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'earliest'
        }
        self.topic = topic
        
        self.spark = SparkSession.builder \
            .appName("KafkaFileProcessor") \
            .getOrCreate()
        
    def consume_and_process(self):
        consumer = Consumer(self.consumer_config)
        consumer.subscribe([self.topic])
        
        try:
            while True:
                msg = consumer.poll(1.0)
                
                if msg is None:
                    continue
                
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        print('Reached end of partition')
                    else:
                        print(f'Error: {msg.error()}')
                    continue
                file_path = msg.value().decode('utf-8')
                
                try:
                    with open(file_path, 'r') as file:
                        content = file.read()

                    df = self.spark.read.json(file_path)
                    
                    df.show()
                    
                except Exception as e:
                    print(f"Error processing file {file_path}: {e}")
        
        except KeyboardInterrupt:
            print("Stopping consumer")
        
        finally:
            consumer.close()
            self.spark.stop()

In [None]:
# Initialize and run the Spark consumer
consumer = SparkKafkaConsumer('localhost:29092', 'Comments')
consumer.consume_and_process()