# Consumer

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

import matplotlib.pyplot as plt
import seaborn as sns
from time import sleep
from IPython.display import clear_output
import pandas as pd

# Initialize Spark session with Kafka package
spark = SparkSession.builder \
    .appName("KafkaSparkStreaming") \
    .getOrCreate()

# Define schema for the data
schema = StructType([
    StructField("year", IntegerType(), True),
    StructField("value", StringType(), True)
])

# Create DataFrame representing the stream of input lines from Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:29092") \
    .option("subscribe", "kfz_bestand") \
    .option("startingOffsets", "earliest") \
    .load()

# Cast the value column to STRING
df = df.selectExpr("CAST(value AS STRING)")

# Parse JSON data
df = df.select(from_json(col("value"), schema).alias("data")).select("data.*")

# Write the streaming DataFrame to an in-memory table
queryStream = df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("kfz_bestand_table") \
    .start()

# Initialize Seaborn
sns.set(style="whitegrid")
plt.rc('font', family='DejaVu Sans')

# Wait for the streaming query to be ready
sleep(10)  # Adjust the sleep time as needed to ensure the stream starts

try:
    i = 1
    while True:
        # Clear output
        clear_output(wait=True)
        print("**********************")
        print("General Info")
        print("**********************")
        print("Run:{}".format(i))
        if len(queryStream.recentProgress) > 0:
            print(queryStream.lastProgress)
            print("Stream timestamp:{}".format(queryStream.lastProgress.get("timestamp", "N/A")))
            event_time = queryStream.lastProgress.get("eventTime", {})
            if "watermark" in event_time:
                print("Watermark:{}".format(event_time["watermark"]))
            state_operators = queryStream.lastProgress.get("stateOperators", [])
            if state_operators:
                print("Total Rows:{}".format(state_operators[0].get("numRowsTotal", "N/A")))
                print("Updated Rows:{}".format(state_operators[0].get("numRowsUpdated", "N/A")))
                print("Memory used MB:{}".format((state_operators[0].get("memoryUsedBytes", 0)) * 0.000001))

        # Fetch data from the in-memory table
        df_pandas = spark.sql("SELECT * FROM kfz_bestand_table").toPandas()

        # Plot the data
        plt.figure(figsize=(10, 6))
        sns.barplot(x='year', y='value', data=df_pandas)
        plt.xlabel('Year')
        plt.ylabel('Kfz-Bestand Value')
        plt.title('Real-time Kfz-Bestand over Years')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

        # Display DataFrame
        print("**********************")
        print("Table - Kfz-Bestand Data")
        print("**********************")
        display(df_pandas)

        # Sleep before the next update
        sleep(3)
        i += 1
except KeyboardInterrupt:
    print("Process interrupted.")
finally:
    queryStream.stop()
    spark.stop()


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

In [8]:
spark.stop()