#Read Streaming Data from Kafka#

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Configure Kafka bootstrap servers
kafka_bootstrap_servers = "bore.pub:9093"

# Read Kafka data in streaming mode with additional timeout configurations
streaming_df = (spark.readStream
               .format("kafka")
               .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
               .option("subscribe", "music_streaming_events")
               .option("startingOffsets", "earliest")
               .option("kafka.metadata.fetch.timeout.ms", "30000")  # Increase metadata fetch timeout to 30 seconds
               .option("kafka.request.timeout.ms", "60000")         # Increase request timeout to 60 seconds
               .option("kafka.session.timeout.ms", "30000")         # Increase session timeout to 30 seconds
               .load())

# Cast the Kafka message value to a string
raw_streaming_df = streaming_df.select(col("value").cast("STRING").alias("message"))

# Write the raw stream to the console for debugging
query_raw = (raw_streaming_df.writeStream
             .format("console")
             .outputMode("append")
             .trigger(processingTime="10 seconds")  # Process every 10 seconds
             .start())

# Wait for the query to terminate (for debugging)
query_raw.awaitTermination(60)  # Wait up to 60 seconds

False

#Parse the JSON Messages#

In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# Define the schema for the JSON data
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("artist", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("listen_timestamp", StringType(), True),
    StructField("title", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("sales", DoubleType(), True),
    StructField("streams", DoubleType(), True),
    StructField("downloads", DoubleType(), True),
    StructField("radio_plays", DoubleType(), True),
    StructField("rating", DoubleType(), True)
])

# Parse the JSON data into structured columns
parsed_streaming_df = raw_streaming_df.withColumn("data", from_json(col("message"), schema)).select("data.*")

# Filter out records with null user_id
parsed_streaming_df = parsed_streaming_df.filter(col("user_id").isNotNull())

# Scale the streams column (multiply by 100 million)
parsed_streaming_df = parsed_streaming_df.withColumn("streams", col("streams") * 1e8)

#Compute Average Streams per Genre#

In [0]:
from pyspark.sql.functions import window, current_timestamp

# Compute average streams per genre over a 1-minute tumbling window
windowed_avg_streams = (parsed_streaming_df
                        .groupBy(
                            window(current_timestamp(), "1 minute"),
                            "genre"
                        )
                        .agg({"streams": "avg"})
                        .withColumnRenamed("avg(streams)", "avg_streams"))

# Write the results to an in-memory table for real-time querying
query_avg = (windowed_avg_streams
             .writeStream
             .format("memory")
             .queryName("avg_streams_table")
             .outputMode("complete")
             .start())

# For debugging, display the in-memory table (run this in a separate cell or loop)
spark.sql("SELECT genre, avg_streams FROM avg_streams_table ORDER BY avg_streams DESC LIMIT 20").show()

#Stopping Query#

In [0]:
# Stop the streaming query
query.stop()



[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1719398306228774>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Stop the streaming query[39;00m
[0;32m----> 2[0m [43mquery[49m[38;5;241;43m.[39;49m[43mstop[49m[43m([49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/sql/streaming/query.py:372[0m, in [0;36mStreamingQuery.stop[0;34m(self)[0m
[1;32m    349[0m [38;5;28;01mdef[39;00m [38;5;21mstop[39m([38;5;28mself[39m) [38;5;241m-[39m[38;5;241m>[39m [38;5;28;01mNone[39;00m:
[1;32m    350[0m     [38;5;124;03m"""[39;00m
[1;32m    351[0m [38;5;124;03m    Stop this streaming query.[39;00m
[1;32m    352[0m 
[0;32m   (...)[0m
[1;32m    370[0m [38;5;124;03m    False[39;00m
[1;32m    371[0m [38;5;124;03m    """[39;00m
[0;32m--> 372[0m     [38;5;28;43mself[39;49m[38;5;241;43m.[39;49m[43m_jsq