In [1]:
from IPython.display import display, clear_output
from datetime import datetime
import time
from pathlib import Path

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DataType
import pyspark.sql.functions as sf
from pyspark.sql.functions import lit, date_format, col, udf

In [2]:
# Constant
CAR_PARK_ZONE_FILEPATH = "data/car_park_zone.parquet"

In [3]:
# Create SparkSession
spark = SparkSession.builder \
        .appName('kafka') \
        .getOrCreate()

In [4]:
## Create the schema of the value field for Car Park Zone
schema_car_park_zone_struct = StructType([
    StructField("zone_id", StringType(),  True),
    StructField("facility_id", StringType(),  True),
    StructField("message_date", StringType(),  True),
    StructField("zone_name", StringType(),  True),
    StructField("spots", StringType(),  True),
    StructField("parent_zone_id", StringType(),  True),
    StructField("zone_occupancy_loop", StringType(), True),
    StructField("zone_occupancy_total", StringType(), True),
    StructField("zone_occupancy_monthlies", StringType(), True),
    StructField("zone_occupancy_open_gate", StringType(),  True),
    StructField("zone_occupancy_transients", StringType(), True),
])

In [5]:
### Subscribe the topic "nsw_car_park_zone" from the Kafka broker and 
### Read the earlierst data into the Spark dataframe called car_park_zone_stream_df
car_park_zone_stream_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "broker:29092") \
  .option("startingOffsets", "earliest") \
  .option("subscribe", "nsw_car_park_zone") \
  .load()

### Convert the columns key and value from car_park_zone_stream_df to string and 
### Save the results into the dataframe again
car_park_zone_stream_df = car_park_zone_stream_df \
    .withColumn("key", car_park_zone_stream_df["key"].cast(StringType())) \
    .withColumn("value", car_park_zone_stream_df["value"].cast(StringType()))

### Convert the column value of string_stream_df to JSON and
### Save the results to the dataframe again 
car_park_zone_stream_df = car_park_zone_stream_df \
    .withColumn("value", F.from_json("value", schema_car_park_zone_struct))

### Flatten the columns from value and rename the columns key,
### topic, timestamp to respectively event_key, event_topic, event_timestamp
car_park_zone_stream_df = car_park_zone_stream_df \
    .select( \
        F.col("key").alias("event_key"), \
        F.col("topic").alias("event_topic"), \
        F.col("timestamp").alias("event_timestamp"), \
        "value.zone_id", \
        "value.facility_id", \
        "value.message_date", \
        "value.zone_name", \
        "value.spots", \
        "value.parent_zone_id", \
        "value.zone_occupancy_loop", \
        "value.zone_occupancy_total", \
        "value.zone_occupancy_monthlies", \
        "value.zone_occupancy_open_gate", \
        "value.zone_occupancy_transients"
    )

### Drop duplicate records
car_park_zone_stream_df = car_park_zone_stream_df.dropDuplicates(["facility_id","zone_id","message_date"])

### Print the schema of car_park_zone_stream_df
car_park_zone_stream_df.printSchema()

root
 |-- event_key: string (nullable = true)
 |-- event_topic: string (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)
 |-- zone_id: string (nullable = true)
 |-- facility_id: string (nullable = true)
 |-- message_date: string (nullable = true)
 |-- zone_name: string (nullable = true)
 |-- spots: string (nullable = true)
 |-- parent_zone_id: string (nullable = true)
 |-- zone_occupancy_loop: string (nullable = true)
 |-- zone_occupancy_total: string (nullable = true)
 |-- zone_occupancy_monthlies: string (nullable = true)
 |-- zone_occupancy_open_gate: string (nullable = true)
 |-- zone_occupancy_transients: string (nullable = true)



In [6]:
car_park_zone_stream = car_park_zone_stream_df \
    .writeStream \
    .format("memory") \
    .queryName("nsw_car_park_zone_view") \
    .start()

## Loop for saving streaming data to parquet

In [None]:
try:
    while(True):
        clear_output(wait=True)
        

        # Get the time of latest record of zones
        zone_offset_dict = {}
        if Path(CAR_PARK_ZONE_FILEPATH).is_dir():
            existed_car_park_zone_df = spark.read.parquet(CAR_PARK_ZONE_FILEPATH)
            existed_car_park_zone_df.registerTempTable("existed_car_park_zone_table")
            zone_offset_df = spark.sql("SELECT facility_id, zone_id, MAX(message_datetime) as offset FROM existed_car_park_zone_table group by facility_id, zone_id")
            for row in zone_offset_df.collect():
                zone_offset_dict[f"{row['facility_id']}_{row['zone_id']}"] = row['offset'].strftime("%Y-%m-%d %H:%M:%S")

        print(zone_offset_dict)


        # get all the zones from topic    
        car_park_facility_zone_list_df = spark.sql("SELECT facility_id, zone_id FROM nsw_car_park_zone_view group by facility_id, zone_id")
        facility_zone_list = [{ "facility_id" : row['facility_id'], "zone_id" : row['zone_id']} for row in car_park_facility_zone_list_df.collect()]

        # construct the filtering part on sql to obtain the missing records only
        DEFAULT_OFFSET = "1970-01-01 00:00:00"
        filter_list = []
        for facility_zone_info in facility_zone_list:
            facility_zone_id = f"{facility_zone_info['facility_id']}_{facility_zone_info['zone_id']}"
            if facility_zone_id in zone_offset_dict:
                offset = zone_offset_dict[facility_zone_id]
            else:
                offset = DEFAULT_OFFSET
            filter_list.append(f"""(facility_id = '{facility_zone_info["facility_id"]}' and zone_id = '{facility_zone_info["zone_id"]}' and message_datetime > '{offset}')""")

        filter_string = f'WHERE {" or ".join(filter_list)}' if len(filter_list) > 0 else ""
        print(filter_string)

        # Query the new records without duplicate records
        new_car_park_zone_df = spark.sql(f"""
            SELECT
                *,
                year(message_datetime) as year,
                quarter(message_datetime) as quarter,
                month(message_datetime) as month,
                dayofweek(message_datetime) as dayofweek,
                dayofmonth(message_datetime) as day,
                hour(message_datetime) as hour,
                minute(message_datetime) as minute
            FROM (
                SELECT 
                    *, to_timestamp(message_date, "yyyy-MM-dd'T'HH:mm:ss") as message_datetime 
                FROM (
                    SELECT 
                        *,
                        row_number() over (partition by facility_id, zone_id, message_date order by message_date desc) as rn
                    FROM nsw_car_park_zone_view
                ) t1
                WHERE rn = 1
            ) t2
            {filter_string}
            ORDER BY message_datetime 
        """)

        # save records
        new_car_park_zone_df.write.format("parquet").mode("append").save(CAR_PARK_ZONE_FILEPATH)
        
        # display save records for checking
        new_car_park_zone_pd_df = new_car_park_zone_df.toPandas()
        display(new_car_park_zone_pd_df)

        time.sleep(60)
        
except KeyboardInterrupt:
    pass

{}
WHERE (facility_id = '4' and zone_id = 'CPS-SHW' and message_datetime > '1970-01-01 00:00:00') or (facility_id = '2' and zone_id = 'CPS-KVE1' and message_datetime > '1970-01-01 00:00:00') or (facility_id = '1' and zone_id = 'CPS-CUD2' and message_datetime > '1970-01-01 00:00:00') or (facility_id = '7' and zone_id = '1' and message_datetime > '1970-01-01 00:00:00') or (facility_id = '1' and zone_id = 'CPS-CUD1' and message_datetime > '1970-01-01 00:00:00') or (facility_id = '486' and zone_id = '1' and message_datetime > '1970-01-01 00:00:00') or (facility_id = '5' and zone_id = 'CPS-CHE' and message_datetime > '1970-01-01 00:00:00') or (facility_id = '488' and zone_id = '4' and message_datetime > '1970-01-01 00:00:00') or (facility_id = '3' and zone_id = 'CPS-BLV' and message_datetime > '1970-01-01 00:00:00') or (facility_id = '1' and zone_id = 'CPS-CUD3' and message_datetime > '1970-01-01 00:00:00') or (facility_id = '488' and zone_id = '5' and message_datetime > '1970-01-01 00:00:0

Unnamed: 0,event_key,event_topic,event_timestamp,zone_id,facility_id,message_date,zone_name,spots,parent_zone_id,zone_occupancy_loop,...,zone_occupancy_transients,rn,message_datetime,year,quarter,month,dayofweek,day,hour,minute
0,5,nsw_car_park_zone,2021-06-13 12:43:01.790,5,488,2021-06-13T22:35:08,on grade,739,0,7.0,...,6.0,1,2021-06-13 22:35:08,2021,2,6,1,13,22,35
1,4,nsw_car_park_zone,2021-06-13 12:43:01.789,4,488,2021-06-13T22:35:08,Multi Level,874,0,19.0,...,8.0,1,2021-06-13 22:35:08,2021,2,6,1,13,22,35
2,1,nsw_car_park_zone,2021-06-13 12:43:03.458,1,489,2021-06-13T22:38:07,SYD326 Manly Vale Park and Ride,142,0,32024.0,...,0.0,1,2021-06-13 22:38:07,2021,2,6,1,13,22,38
3,2,nsw_car_park_zone,2021-06-13 12:43:00.574,2,487,2021-06-13T22:40:52,SYD319 Kogarah Park and Ride,259,0,140.0,...,0.0,1,2021-06-13 22:40:52,2021,2,6,1,13,22,40
4,1,nsw_car_park_zone,2021-06-13 12:42:59.478,1,486,2021-06-13T22:41:29,SYD318 Ashfield Park and Ride,180,0,175953.0,...,6.0,1,2021-06-13 22:41:29,2021,2,6,1,13,22,41
5,CPS-SHW,nsw_car_park_zone,2021-06-13 12:42:58.384,CPS-SHW,4,2021-06-13T22:42:12,Hills Showground Station Multi-Level Car Park,600,0,,...,,1,2021-06-13 22:42:12,2021,2,6,1,13,22,42
6,CPS-BLV,nsw_car_park_zone,2021-06-13 12:42:57.234,CPS-BLV,3,2021-06-13T22:42:12,Bella Vista Station Multi-Level Car Park,800,0,,...,,1,2021-06-13 22:42:12,2021,2,6,1,13,22,42
7,CPS-CUD2,nsw_car_park_zone,2021-06-13 12:42:54.927,CPS-CUD2,1,2021-06-13T22:42:12,Tallawong Station At-Grade B Car Park,455,0,,...,,1,2021-06-13 22:42:12,2021,2,6,1,13,22,42
8,CPS-CUD1,nsw_car_park_zone,2021-06-13 12:42:54.925,CPS-CUD1,1,2021-06-13T22:42:12,Tallawong Station At-Grade A Car Park,152,0,,...,,1,2021-06-13 22:42:12,2021,2,6,1,13,22,42
9,CPS-CUD3,nsw_car_park_zone,2021-06-13 12:42:54.930,CPS-CUD3,1,2021-06-13T22:42:12,Tallawong Station At-Grade D Car Park,397,0,,...,,1,2021-06-13 22:42:12,2021,2,6,1,13,22,42


In [19]:
car_park_zone_stream.stop()

## Check saved Records

In [None]:
# read data from parquet and print samples
saved_car_park_zone_df = spark.read.parquet(CAR_PARK_ZONE_FILEPATH)
saved_car_park_zone_pd_df = saved_car_park_zone_df.toPandas()
saved_car_park_zone_pd_df.head(2)

In [None]:
saved_car_park_zone_pd_df.query("zone_id == 'CPS-SHW'").sort_values(["message_datetime"])

In [12]:
spark.stop()