Before running this example you need to perform the following steps:

 - Inside the kafka container, create the traffic sensor topic by running:
 
   `kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor`
   

 - Recreate the file work/data/AGOSTO_PARQUET_FINAL.zip by running:
 
   `cat work/data/AGOSTO_PARQUET_FINAL.zip-a* > work/data/AGOSTO_PARQUET_FINAL.zip`
   
   
 - Unzip work/data/AGOSTO_PARQUET_FINAL.zip. The following path must exist after unzipping: work/data/AGOSTO_PARQUET_FINAL
   

 - Load the data to kafka by running the spark job at work/src/main/python/insert_traffic_topic.sh


In [1]:
#spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # when set, DataFrames are eagerly evaluated, no need to call show
data = spark.read.load('/work/data/AGOSTO_2022_PARQUET_FINAL')
#data # it will display the DataFrame's contents when spark.sql.repl.eagerEval.enabled is True 

                                                                                

In [2]:
KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
KAFKA_TOPIC = "traffic_sensor"
SCHEMA = data.schema

In [3]:
data.printSchema()

root
 |-- EQP_ID: long (nullable = true)
 |-- DATE_TIME: timestamp (nullable = true)
 |-- MILLISECOND: long (nullable = true)
 |-- CLASSIFICATION: string (nullable = true)
 |-- ROAD_LANE: long (nullable = true)
 |-- ADDRESS_ID: long (nullable = true)
 |-- ROAD_SPEED: string (nullable = true)
 |-- VEHICLE_SPEED: string (nullable = true)
 |-- VEHICLE_LENGTH: string (nullable = true)
 |-- SERIAL_NUMBER: long (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- ADDRESS: string (nullable = true)
 |-- DIRECTION: string (nullable = true)



In [4]:
df_traffic_stream = spark\
    .readStream.format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
    .option("subscribe", KAFKA_TOPIC)\
    .option("startingOffsets", "earliest")\
    .load()

In [5]:
import pyspark.sql.functions as F

df_traffic_stream = df_traffic_stream\
    .select(
        F.from_json(
            # decode string as iso-8859-1
            F.decode(F.col("value"), "iso-8859-1"),
            SCHEMA
        ).alias("value")
    )\
    .select("value.*")

## Count each vehicle type

In [6]:
vehicle_type_stream = df_traffic_stream.select(F.col("classification").alias("vehicle_type"))

In [7]:
vehicle_type_stream.groupBy("vehicle_type")\
    .count()\
    .writeStream\
    .queryName("vehicle_type_count")\
    .outputMode("complete")\
    .format("memory")\
    .start()

22/11/28 04:18:02 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c34ca563-3613-4f26-9b10-dd1967f06b9d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7ff7e8359750>

In [8]:
from time import sleep
from IPython.display import display, clear_output

In [9]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [11]:
for x in range(10):
    clear_output(wait=True)
    spark.sql("SELECT * FROM vehicle_type_count").show()
    sleep(5)

+------------------+-----+
|      vehicle_type|count|
+------------------+-----+
|         AUTOMÓVEL|80507|
|        INDEFINIDO|    2|
|              MOTO|13609|
|CAMINHÃO / ÔNIBUS | 5882|
+------------------+-----+



## Tumbling Windows

Count records for each 5-minutes window

In [12]:
df_traffic_stream\
    .groupBy(
        F.window("DATE_TIME", "5 minutes", )
    )\
    .count()\
    .writeStream\
    .option("truncate", "false")\
    .outputMode("update")\
    .format("memory")\
    .queryName("tumbling_windows")\
    .start()

22/11/28 04:20:38 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e487794b-1f77-43d3-8a42-4832c5c242e4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7ff7e8030190>

In [13]:
for x in range(10):
    clear_output(wait=True)
    spark.sql("SELECT * FROM tumbling_windows limit 10").show(truncate=False)
    sleep(5)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|[2022-08-04 11:15:00, 2022-08-04 11:20:00]|10669|
|[2022-08-04 11:45:00, 2022-08-04 11:50:00]|4767 |
|[2022-08-04 11:10:00, 2022-08-04 11:15:00]|10730|
|[2022-08-04 11:00:00, 2022-08-04 11:05:00]|9578 |
|[2022-08-04 11:25:00, 2022-08-04 11:30:00]|10625|
|[2022-08-04 11:05:00, 2022-08-04 11:10:00]|10747|
|[2022-08-04 11:20:00, 2022-08-04 11:25:00]|10786|
|[2022-08-04 11:40:00, 2022-08-04 11:45:00]|10705|
|[2022-08-04 11:35:00, 2022-08-04 11:40:00]|10957|
|[2022-08-04 11:30:00, 2022-08-04 11:35:00]|10436|
+------------------------------------------+-----+



## Slinding Windows
Count records for each 10-minutes window each 5 minutes

In [14]:
df_traffic_stream\
    .groupBy(
        F.window("DATE_TIME", "10 minutes", "5 minutes")
    )\
    .count()\
    .sort('window')\
    .writeStream\
    .option("truncate", "false")\
    .outputMode("complete")\
    .format("memory")\
    .queryName("sliding_window")\
    .start()

22/11/28 04:21:44 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-02af6434-8857-49d0-969b-63d622b3a7ee. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7ff7e7fe4c90>

In [15]:
for x in range(10):
    clear_output(wait=True)
    spark.sql("SELECT * FROM sliding_window limit 10").show(truncate=False)
    sleep(5)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|21477|
|[2022-08-04 11:10:00, 2022-08-04 11:20:00]|21399|
|[2022-08-04 11:15:00, 2022-08-04 11:25:00]|21455|
|[2022-08-04 11:35:00, 2022-08-04 11:45:00]|21662|
|[2022-08-04 11:40:00, 2022-08-04 11:50:00]|15472|
|[2022-08-04 11:45:00, 2022-08-04 11:55:00]|4767 |
|[2022-08-04 10:55:00, 2022-08-04 11:05:00]|9578 |
|[2022-08-04 11:00:00, 2022-08-04 11:10:00]|20325|
|[2022-08-04 11:20:00, 2022-08-04 11:30:00]|21411|
|[2022-08-04 11:25:00, 2022-08-04 11:35:00]|21061|
+------------------------------------------+-----+



## Average Speed Per Address

In [16]:
df_traffic_stream\
    .withColumn('speed', F.col('vehicle_speed').cast('double'))\
    .groupBy(
        F.window("DATE_TIME", "10 minutes", "5 minutes"),
        F.col('ADDRESS_ID')
    )\
    .avg('speed')\
    .sort('window', 'address_id')\
    .writeStream\
    .option("truncate", "false")\
    .outputMode("complete")\
    .format("memory")\
    .queryName("avg_speed_per_addr")\
    .start()

22/11/28 04:22:48 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c62db688-e577-4c7a-aedc-cd1dfdccb16d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7ff7e8357e90>

In [17]:
for x in range(10):
    clear_output(wait=True)
    # remember to limit number of output rows
    spark.sql("SELECT * FROM avg_speed_per_addr limit 20").show(truncate=False)
    sleep(5)

+------------------------------------------+----------+------------------+
|window                                    |ADDRESS_ID|avg(speed)        |
+------------------------------------------+----------+------------------+
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|374       |51.8252427184466  |
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|375       |36.703539823008846|
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|380       |50.104166666666664|
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|381       |44.356521739130436|
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|383       |52.02229299363057 |
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|385       |49.11013215859031 |
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|386       |45.54455445544554 |
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|389       |46.425            |
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|390       |42.301587301587304|
|[2022-08-04 11:05:00, 2022-08-04 11:15:00]|391       |37.67914438502674 |
|[2022-08-04 11:05:00, 20