In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit, from_json
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Lab9_Ex3")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

dataSchema = StructType(
    [StructField("username", StringType(), True),
     StructField("teamname", StringType(), True),
     StructField("score", IntegerType(), True),
     StructField("timestamp_in_ms", LongType(), True),
     StructField("readable_time", StringType(), True)
     ])

# Read the whole dataset as a batch
kafkaStream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "game") \
    .option("startingOffsets", "latest") \
    .load()

df = kafkaStream.selectExpr("CAST(value AS STRING)")

df1 = df.select(from_json(df.value, dataSchema.simpleString()))

df1.printSchema()

sdf = df1.select(col("from_json(value).*"))

sdf.printSchema()

# create the event time column 
withEventTimedf = sdf.selectExpr(
    "*",
    "cast(timestamp_in_ms/1000.0 as timestamp) as event_time")

withEventTimedf.printSchema()

avgscoredf = withEventTimedf \
    .groupBy(window(col("event_time"), "10 seconds"), "username", "teamname") \
    .agg(avg("score").alias("value"))

resultdf = avgscoredf.select(concat(col("username"), lit(" "), col("teamname")).alias("key"), col("value").cast("string"))

resultdf.printSchema()

query = resultdf \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("checkpointLocation", "/home/jovyan/checkpoint/gamescore") \
    .option("topic", "avg_score") \
    .outputMode("complete") \
    .start()
try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

root
 |-- from_json(value): struct (nullable = true)
 |    |-- username: string (nullable = true)
 |    |-- teamname: string (nullable = true)
 |    |-- score: integer (nullable = true)
 |    |-- timestamp_in_ms: long (nullable = true)
 |    |-- readable_time: string (nullable = true)

root
 |-- username: string (nullable = true)
 |-- teamname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)

root
 |-- username: string (nullable = true)
 |-- teamname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)
 |-- event_time: timestamp (nullable = true)

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


Stoped the streaming query and the spark context


In [3]:
# Stop the spark context
spark.stop()