In [1]:
import pyspark
import os
from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.functions import from_json, col, avg
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType

In [2]:
dotenv_path = Path("/opt/app/.env")
load_dotenv(dotenv_path=dotenv_path)
kafka_host = os.getenv("KAFKA_HOST")
kafka_topic = os.getenv("KAFKA_TOPIC_NAME")

In [None]:
spark = (
    pyspark.sql.SparkSession.builder.appName("DibimbingStreaming")
    .master("local")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
    .config("spark.sql.shuffle.partitions", 4)
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

In [None]:
schema = StructType(
    [
        StructField("order_date", StringType(), True),
        StructField("order_id", StringType(), True),
        StructField("customer_name", StringType(), True),
        StructField("product", StringType(), True),
        StructField("category", StringType(), True),
        StructField("price_per_product", IntegerType(), True),
        StructField("total_payment", IntegerType(), True),
        StructField("payment_type", StringType(), True),
        StructField("timestamp", LongType(), True),
        StructField("store_branch", StringType(), True),
        StructField("is_new", BooleanType(), True)
    ]
)

stream_df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", f"{kafka_host}:9092")
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "latest")
    .load()
)


In [None]:
retail_data = stream_retail.selectExpr("CAST(value AS STRING)") \
    .selectExpr("CAST(value AS STRING) as json") \
    .selectExpr("json") \
    .select(col("json").cast("string").alias("value")) \
    .selectExpr("CAST(value AS STRING)")

retail_data = retail_data.selectExpr("value") \
    .selectExpr("CAST(value AS STRING)") \
    .rdd.map(lambda x: json.loads(x[0])) \
    .toDF(schema=schema)

aggregated_df = parsed_df.groupBy(window(col("timestamp"), "1 day").alias("day")) \
    .agg(spark_sum("price").alias("running_total")) \
    .select(col("day.start").alias("timestamp"), "running_total")

query = aggregated_df.writeStream \
    .outputMode("complete") \
    .foreachBatch(lambda batch_df, batch_id: batch_df.show(truncate=False)) \
    .start()

query.awaitTermination()
