In [1]:
# %pip install kafka-python pyspark==3.3.2 findspark

In [2]:
from kafka import KafkaConsumer, KafkaProducer
from kafka.consumer.fetcher import ConsumerRecord
import json
import asyncio
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import findspark
from pyspark.sql.functions import from_csv, col, avg, round as _round, countDistinct

In [3]:
findspark.init(os.environ["SPARK_HOME"])

In [4]:
TOPIC = "KafkaMusicStream"

In [5]:
consumer = KafkaConsumer(TOPIC, bootstrap_servers="localhost:9092", consumer_timeout_ms=5000)
producer = KafkaProducer(bootstrap_servers="localhost:9092")

In [6]:
async def run_consumer(consumer: KafkaConsumer):
    for msg in consumer:
        await asyncio.sleep(1)
        assert isinstance(msg, ConsumerRecord), "Invalid Type"
        print(json.loads(msg.value))
        
def run_producer(producer: KafkaProducer):
    with open("data/SpotifyFeatures.csv") as f:
        f.readline() # first line
        
        while((line := f.readline()) != ""):
            producer.send(TOPIC, line.encode())
        

In [7]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("KafkaSqlStream")
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")
         .config("spark.sql.warehouse.dir", "spark-warehouse")  
         .config("spark.driver.memory", "4g")
         .enableHiveSupport()
         .getOrCreate())

spark.sparkContext.setLogLevel("WARN")
print("Spark started:", spark)


your 131072x1 screen size is bogus. expect trouble


25/11/06 13:36:01 WARN Utils: Your hostname, LAPTOP-IVV27U6L resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/11/06 13:36:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/omnissiah/SPARK/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/omnissiah/.ivy2/cache
The jars for the packages stored in: /home/omnissiah/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-81ea3652-5d13-4f42-8bc8-474fe682347d;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 687ms :: artifacts dl 32

25/11/06 13:36:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark started: <pyspark.sql.session.SparkSession object at 0x749e3f36cc50>


In [8]:
# Read from Kafka
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", TOPIC) \
    .load()
    
schema = StructType([
    StructField("genre", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("track_name", StringType(), True),
    StructField("track_id", StringType(), True),
    StructField("popularity", IntegerType(), True),
    StructField("acousticness", DoubleType(), True),
    StructField("danceability", DoubleType(), True),
    StructField("duration_ms", IntegerType(), True),
    StructField("energy", DoubleType(), True),
    StructField("instrumentalness", DoubleType(), True),
    StructField("key", StringType(), True),
    StructField("liveness", DoubleType(), True),
    StructField("loudness", DoubleType(), True),
    StructField("mode", StringType(), True),
    StructField("speechiness", DoubleType(), True),
    StructField("tempo", DoubleType(), True),
    StructField("time_signature", StringType(), True),
    StructField("valence", DoubleType(), True)
])

In [9]:
try:
    csv_df = kafka_df.selectExpr("CAST(value AS STRING) as csv_value")
    parsed_df = csv_df.select(from_csv("csv_value", schema.simpleString()).alias("data")).select("data.*")

    parsed_df.createOrReplaceTempView("music_stream")

    sql_df = spark.sql("""
        SELECT genre, AVG(popularity) AS avg_popularity, COUNT(*) AS count
        FROM music_stream
        GROUP BY genre
    """)

    query = sql_df.writeStream \
        .outputMode("complete") \
        .format("console") \
        .option("truncate", False) \
        .start()

    query.awaitTermination(30)
except:
    pass

25/11/06 13:36:11 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d4685111-a4c0-4427-865a-933f4f154757. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/11/06 13:36:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+--------------+-----+
|genre|avg_popularity|count|
+-----+--------------+-----+
+-----+--------------+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+--------------+-----+
|genre      |avg_popularity|count|
+-----------+--------------+-----+
|Alternative|46.74         |1600 |
+-----------+--------------+-----+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------+------------------+-----+
|genre      |avg_popularity    |count|
+-----------+------------------+-----+
|Alternative|46.599444444444444|1800 |
+-----------+------------------+-----+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+-----------+-----------------+-----+
|genre      |avg_popularity   |count|
+-----------+-----------------+-----+
|Alternative|46.09291666666667|2400 |
+-----------+-----------------+-----+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+-----------+-----------------+-----+
|genre      |avg_popularity   |count|
+-----------+-----------------+-----+
|Alternative|45.52199874292898|3182 |
|Dance      |80.83333333333333|18   |
+-----------+-----------------+-----+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+-----------+-----------------+-----+
|genre      |avg_popularity   |count|
+-----------+-----------------+-----+
|Alternative|45.52199874292898|3182 |
|Dance      |71.29095354523227|818  |
+-----------+-----------------+-----+



In [None]:
try:

    with spark as sp:
        top_genres = (
            parsed_df.groupBy("genre")
                    .agg(_round(avg("popularity"), 2).alias("avg_popularity"))
                    .orderBy(col("avg_popularity").desc())
        )

        query1 = (
            top_genres.writeStream
                    .outputMode("complete")
                    .format("console")
                    .option("truncate", False)
                    .start()
        )
        
        energy_dance = (
            parsed_df.groupBy("genre")
                    .agg(
                        _round(avg("energy"), 3).alias("avg_energy"),
                        _round(avg("danceability"), 3).alias("avg_danceability")
                    )
        )
        
        query2 = (
            energy_dance.writeStream
                        .outputMode("complete")
                        .format("console")
                        .option("truncate", False)
                        .start()
        )
        
        top_artists = (
            parsed_df.groupBy("artist_name")
                    .count()
                    .withColumnRenamed("count", "total_songs")
                    .orderBy(col("total_songs").desc())
        )

        query3 = (
            top_artists.writeStream
                    .outputMode("complete")
                    .format("console")
                    .option("truncate", False)
                    .start()
        )

        # --- New Queries ---

        # 4. Average valence and energy per genre (mood analysis)
        mood_stats = (
            parsed_df.groupBy("genre")
                    .agg(
                        _round(avg("valence"), 3).alias("avg_valence"),
                        _round(avg("energy"), 3).alias("avg_energy")
                    )
                    .orderBy(col("avg_valence").desc())
        )

        query4 = (
            mood_stats.writeStream
                    .outputMode("complete")
                    .format("console")
                    .option("truncate", False)
                    .start()
        )

        # 5. Average loudness and acousticness per genre
        sound_profile = (
            parsed_df.groupBy("genre")
                    .agg(
                        _round(avg("loudness"), 2).alias("avg_loudness"),
                        _round(avg("acousticness"), 3).alias("avg_acousticness")
                    )
                    .orderBy(col("avg_loudness").desc())
        )

        query5 = (
            sound_profile.writeStream
                    .outputMode("complete")
                    .format("console")
                    .option("truncate", False)
                    .start()
        )

        # 6. Average tempo and time signature distribution
        tempo_stats = (
            parsed_df.groupBy("genre")
                    .agg(
                        _round(avg("tempo"), 2).alias("avg_tempo"),
                        countDistinct("time_signature").alias("unique_time_sigs")
                    )
                    .orderBy(col("avg_tempo").desc())
        )

        query6 = (
            tempo_stats.writeStream
                    .outputMode("complete")
                    .format("console")
                    .option("truncate", False)
                    .start()
        )

        # 7. Top tracks by popularity
        top_tracks = (
            parsed_df.select("track_name", "artist_name", "popularity")
                    .orderBy(col("popularity").desc())
                    .limit(10)
        )

        query7 = (
            top_tracks.writeStream
                    .outputMode("complete")
                    .format("console")
                    .option("truncate", False)
                    .start()
        )

        sp.streams.awaitAnyTermination()

except:
    pass

25/11/06 13:36:41 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-81f276ba-ed5d-4659-95ec-78755ec1bb59. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/11/06 13:36:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/11/06 13:36:41 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-83291732-8643-47ba-9848-faccc939129d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/11/06 13:36:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not support

25/11/06 13:36:45 ERROR MicroBatchExecution: Query [id = e9c233d5-5476-4b08-9e13-b93dbe92250b, runId = e7bf861b-b4e7-4480-9d33-b487a0569f26] terminated with error
org.apache.spark.SparkException: The Spark SQL phase planning failed with an internal error. Please, fill a bug report in, and provide the full stack trace.
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
	at org.apache.spark.sql.execution.QueryExecuti

Exception in thread "stream execution thread for [id = e9c233d5-5476-4b08-9e13-b93dbe92250b, runId = e7bf861b-b4e7-4480-9d33-b487a0569f26]" org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.deactivateInstances(StateStoreCoordinator.scala:119)
	at org.apache.spark.sql.streaming.StreamingQueryManager.notifyQueryTermination(StreamingQueryManager.scala:406)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$3(StreamExecution.scala:357)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(Uninterrup

25/11/06 13:36:45 ERROR MicroBatchExecution: Query [id = 1911559c-fa4d-489a-9a1b-273721861905, runId = 1c750ad8-f972-43fb-8ad6-90c58d85e3e9] terminated with error
org.apache.spark.SparkException: The Spark SQL phase planning failed with an internal error. Please, fill a bug report in, and provide the full stack trace.
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
	at org.apache.spark.sql.execution.QueryExecuti

Exception in thread "stream execution thread for [id = 1911559c-fa4d-489a-9a1b-273721861905, runId = 1c750ad8-f972-43fb-8ad6-90c58d85e3e9]" org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.deactivateInstances(StateStoreCoordinator.scala:119)
	at org.apache.spark.sql.streaming.StreamingQueryManager.notifyQueryTermination(StreamingQueryManager.scala:406)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$3(StreamExecution.scala:357)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(Uninterrup

25/11/06 13:37:17 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:596)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:582)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:442)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.

Exception in thread "stream execution thread for [id = 3de4960a-4cbb-47a6-8a2a-2a9455012ee4, runId = ce36e69e-6378-49b8-a2b5-acfdbfe92d1d]" org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.deactivateInstances(StateStoreCoordinator.scala:119)
	at org.apache.spark.sql.streaming.StreamingQueryManager.notifyQueryTermination(StreamingQueryManager.scala:406)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$3(StreamExecution.scala:357)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(Uninterrup