In [1]:
!pip install pyspark



In [None]:
from confluent_kafka import Consumer, KafkaException, KafkaError
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json

consumer_conf = {
    'bootstrap.servers': 'localhost:9092',  # Kafka broker address
    'group.id': 'joke-consumer',            # Consumer group ID
    'auto.offset.reset': 'earliest'         # Start from the earliest message
}

consumer = Consumer(consumer_conf)
topic = 'test1-topic'  # Replace with your Kafka topic name
consumer.subscribe([topic])

spark = SparkSession.builder.appName("JokeConsumer").getOrCreate()

data_list = []

def process_message(message):
    try:
        data = json.loads(message)
        data_list.append((
            data['type'],
            len(data['setup']),
            len(data['punchline']),
            data['id']
        ))
    except (KeyError, json.JSONDecodeError):
        print("Error processing message: Invalid JSON structure")

def consume_messages():
    print("Listening for messages...")
    try:
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f"End of partition reached: {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
                else:
                    print(f"Error: {msg.error()}")
            else:
                print(f"Received message: {msg.value().decode('utf-8')}")
                process_message(msg.value().decode('utf-8'))
    except KeyboardInterrupt:
        print("\nConsumer interrupted. Shutting down...")
        if data_list:
            df = spark.createDataFrame(data_list, ["type", "setup_length", "punchline_length", "id"])
            df.show(truncate=False)
            print("\nDescriptive Statistics:")
            df.describe("setup_length", "punchline_length", "id").show()

            print("\nAdditional Statistics:")
            stats = df.agg(
                F.avg("setup_length").alias("avg_setup_length"),
                F.avg("punchline_length").alias("avg_punchline_length"),
                F.stddev("setup_length").alias("stddev_setup_length"),
                F.stddev("punchline_length").alias("stddev_punchline_length")
            )
            stats.show()
        else:
            print("No data to analyze.")
    finally:
        consumer.close()
        print("Consumer closed.")

consume_messages()

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



Received message: {"type": "general", "setup": "What do you call a monkey in a mine field?", "punchline": "A babooooom!", "id": 210}
Received message: {"type": "general", "setup": "What biscuit does a short person like?", "punchline": "Shortbread. ", "id": 155}
Received message: {"type": "general", "setup": "Why do crabs never give to charity?", "punchline": "Because they\u2019re shellfish.", "id": 346}
Received message: {"type": "general", "setup": "Did you hear about the runner who was criticized?", "punchline": "He just took it in stride", "id": 93}
Received message: {"type": "general", "setup": "What did Michael Jackson name his denim store?", "punchline": "Billy Jeans!", "id": 159}
Received message: {"type": "general", "setup": "How good are you at Power Point?", "punchline": "I Excel at it.", "id": 128}
Received message: {"type": "general", "setup": "What did the pirate say on his 80th birthday?", "punchline": "Aye Matey!", "id": 181}
Received message: {"type": "programming", "se