In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import window as W
from pyspark.sql import functions as F

spark = (
    SparkSession 
    .builder 
    .appName("Streaming from Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.driver.extraClassPath", "./jdbc/mysql-connector-j-8.4.0.jar") \
    .config("spark.sql.shuffle.partitions", 8)
    .master("local[*]") 
    .getOrCreate()
)

In [2]:
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:19091,kafka2:19092,kafka3:19093") \
    .option("subscribe", "user") \
    .option("startingOffsets", "latest") \
    .load()

In [3]:
schema = T.StructType([
    T.StructField("birthdate", T.StringType()),
    T.StructField("blood_group", T.StringType()),
    T.StructField("job", T.StringType()),
    T.StructField("name", T.StringType()),
    T.StructField("residence", T.StringType()),
    T.StructField("sex", T.StringType()),
    T.StructField("ssn", T.StringType()),
    T.StructField("uuid", T.StringType()),
    T.StructField("timestamp", T.TimestampType()),
    ])

In [4]:
value_df = kafka_df.select(F.from_json(F.col("value").cast("string"), schema).alias("value"))

processed_df = value_df.selectExpr(
    "value.birthdate", 
    "value.blood_group", 
    "value.job",
    "value.name",
    "value.residence",
    "value.sex",
    "value.ssn",
    "value.uuid",
    "value.timestamp"
)

In [19]:
df_agg = processed_df \
    .withWatermark("timestamp", "5 seconds") \
    .groupBy(F.window("timestamp", "5 seconds"),
                          "uuid").count()

# df_order = df_agg.groupby("window").count().orderBy('window')

df_final = df_agg.selectExpr("window.start as start_time", "window.end as end_time", 'uuid', "count").orderBy('start_time')


In [6]:
df_console = df_final.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("checkpointLocation", "checkpoint_dir/05_window_operations_and_watermarks") \
    .trigger(processingTime="5 seconds") \
    .start()

df_console.awaitTermination()

AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;
Project [window#41-T5000ms.start AS start_time#64, window#41-T5000ms.end AS end_time#65, count#61L]
+- Sort [window#41-T5000ms ASC NULLS FIRST], true
   +- Aggregate [window#41-T5000ms], [window#41-T5000ms, count(1) AS count#61L]
      +- Aggregate [window#53-T5000ms, uuid#30], [window#53-T5000ms AS window#41-T5000ms, uuid#30, count(1) AS count#52L]
         +- Project [named_struct(start, precisetimestampconversion(((precisetimestampconversion(timestamp#31-T5000ms, TimestampType, LongType) - (((precisetimestampconversion(timestamp#31-T5000ms, TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(timestamp#31-T5000ms, TimestampType, LongType) - (((precisetimestampconversion(timestamp#31-T5000ms, TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 5000000), LongType, TimestampType)) AS window#53-T5000ms, birthdate#23, blood_group#24, job#25, name#26, residence#27, sex#28, ssn#29, uuid#30, timestamp#31-T5000ms]
            +- Filter isnotnull(timestamp#31-T5000ms)
               +- EventTimeWatermark timestamp#31: timestamp, 5 seconds
                  +- Project [value#21.birthdate AS birthdate#23, value#21.blood_group AS blood_group#24, value#21.job AS job#25, value#21.name AS name#26, value#21.residence AS residence#27, value#21.sex AS sex#28, value#21.ssn AS ssn#29, value#21.uuid AS uuid#30, value#21.timestamp AS timestamp#31]
                     +- Project [from_json(StructField(birthdate,StringType,true), StructField(blood_group,StringType,true), StructField(job,StringType,true), StructField(name,StringType,true), StructField(residence,StringType,true), StructField(sex,StringType,true), StructField(ssn,StringType,true), StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), cast(value#8 as string), Some(Etc/UTC)) AS value#21]
                        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@51b1f651, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@76042dd7, [startingOffsets=latest, kafka.bootstrap.servers=kafka1:19091,kafka2:19092,kafka3:19093, subscribe=user], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3db85c46,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> kafka1:19091,kafka2:19092,kafka3:19093, subscribe -> user, startingOffsets -> latest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]


In [8]:
# df_memory = processed_df.writeStream \
#     .format("memory") \
#     .queryName("kafka_memory") \
#     .outputMode("append") \
#     .option("checkpointLocation", "checkpoint_dir/05_window_operations_and_watermarks") \
#     .trigger(processingTime="5 seconds") \
#     .start()

# df_memory.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [10]:
df = spark.sql("SELECT * FROM kafka_memory")

df.show()

+---------+-----------+--------------------+-----------------+--------------------+---+-----------+--------------------+-------------------+
|birthdate|blood_group|                 job|             name|           residence|sex|        ssn|                uuid|          timestamp|
+---------+-----------+--------------------+-----------------+--------------------+---+-----------+--------------------+-------------------+
| 19670613|         O+|  Analytical chemist| Michael Erickson|80800 Pamela Cany...|  M|254-75-7053|LJoSb6NWRWW27Zkpn...|2024-05-12 15:32:33|
| 20211103|         A-|Estate manager/la...|     Andrea Lopez|440 Wise Burgs\nP...|  F|822-34-7088|f9i6J3xv2FojTszsh...|2024-05-12 15:32:34|
| 20040509|         B+|Human resources o...|      Lisa Tanner|USNS Walker\nFPO ...|  F|764-39-7261|jnVioyWFLrei7qRMF...|2024-05-12 15:32:35|
| 19980121|         O+|        Retail buyer|     Karen Bryant|929 Natalie Mount...|  F|192-41-8135|SEcJmNfmbh3bjixAs...|2024-05-12 15:32:36|
| 19290418|  

In [16]:
df_agg = df \
    .withWatermark("timestamp", "5 seconds") \
    .groupBy(F.window("timestamp", "5 seconds"),
                          "uuid").count()

In [21]:
df_agg.show(truncate=False)

+------------------------------------------+----------------------+-----+
|window                                    |uuid                  |count|
+------------------------------------------+----------------------+-----+
|{2024-05-12 15:32:35, 2024-05-12 15:32:40}|jnVioyWFLrei7qRMFLNs3r|1    |
|{2024-05-12 15:32:30, 2024-05-12 15:32:35}|f9i6J3xv2FojTszshGHQx5|1    |
|{2024-05-12 15:32:35, 2024-05-12 15:32:40}|SEcJmNfmbh3bjixAscKdpp|1    |
|{2024-05-12 15:32:30, 2024-05-12 15:32:35}|LJoSb6NWRWW27ZkpnFYjvq|1    |
|{2024-05-12 15:32:40, 2024-05-12 15:32:45}|eHHXURBAujxKjiVydguqsm|1    |
|{2024-05-12 15:32:35, 2024-05-12 15:32:40}|ZjaTjVETGfA9KUmHkHweEf|1    |
|{2024-05-12 15:32:35, 2024-05-12 15:32:40}|3tjTJMzvNK2inXjH2ii84Y|1    |
|{2024-05-12 15:32:35, 2024-05-12 15:32:40}|drHoMCsYPHbZkGFezqStS5|1    |
|{2024-05-12 15:32:40, 2024-05-12 15:32:45}|5HuyHRHwMmL7EQSdEnPSBk|1    |
|{2024-05-12 15:32:40, 2024-05-12 15:32:45}|E5GxZkgKm6Y6LVLBLm3Y8n|1    |
|{2024-05-12 15:32:40, 2024-05-12 15:3

In [26]:
df_order = df_agg.groupby("window").count().orderBy('window')

In [27]:
df_final = df_order.selectExpr("window.start as start_time", "window.end as end_time", "count")

df_final.show(truncate=False)

+-------------------+-------------------+-----+
|start_time         |end_time           |count|
+-------------------+-------------------+-----+
|2024-05-12 15:32:30|2024-05-12 15:32:35|2    |
|2024-05-12 15:32:35|2024-05-12 15:32:40|5    |
|2024-05-12 15:32:40|2024-05-12 15:32:45|4    |
|2024-05-12 15:32:45|2024-05-12 15:32:50|5    |
+-------------------+-------------------+-----+

