In [27]:
import os
from pyspark.sql import SparkSession
from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
from pyspark.sql.functions import from_json, col, expr, to_timestamp, window, sum as _sum

In [2]:
spark = (
    SparkSession 
    .builder 
    .appName("Assignment_25") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

In [3]:
spark

In [4]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [5]:
kafka_host = os.getenv('KAFKA_HOST')
kafka_topic = os.getenv('KAFKA_TOPIC_NAME')

In [6]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [7]:
schema = StructType(
    [
        StructField("order_id", StringType(), True),
        StructField("customer_id", IntegerType(), True),
        StructField("furniture", StringType(), True),
        StructField("color", StringType(), True),
        StructField("price", LongType(), True),
        StructField("ts", StringType(), True)
    ]
)

In [22]:
parsed_df = (
    kafka_df
    .withColumn("value", expr("cast(value as string)"))
    .select(
        from_json(col("value"), schema)
        .alias("data")
    )
    .select("data.*")
)

parsed_df = parsed_df.withColumn("ts", to_timestamp("ts", "HH:mm:ss"))

In [28]:
agg_df = (
    parsed_df
    .withWatermark("ts", "210 minutes")
    .groupBy(window(col("ts"), "1 hour"))
    .agg(_sum("price").alias("hourly_total"))
    .withColumn("timestamp", col("window").getField("end"))
    .select("timestamp", "hourly_total")
    .orderBy("timestamp")
)

In [None]:
query = (
    agg_df
    .writeStream
    .outputMode("complete")
    .format("console")
    .option("truncate", False)
    .option("checkpointLocation", "checkpoint_dir")
    .trigger(processingTime="5 minutes")
    .start()
)
query.awaitTermination()