In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType

# Initialize Spark session with Kafka connector
spark = SparkSession.builder \
    .appName("KafkaSalesStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
    .getOrCreate()

KAFKA_BOOTSTRAP_SERVERS = "kafka-broker:9092"  # For Docker-to-Docker networking
KAFKA_TOPIC = "sales_transactions"

# Define schema for incoming JSON
schema = StructType([
    StructField("transaction_id", IntegerType()),
    StructField("timestamp", StringType()),
    StructField("customer_id", IntegerType()),
    StructField("product_category", StringType()),
    StructField("amount", DoubleType()),
    StructField("payment_method", StringType())
])

print("✅ Starting to read stream from Kafka...")

# Read streaming data from Kafka
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "latest") \
    .load()

print("✅ Kafka stream connected. Parsing incoming data...")

# Parse JSON messages
parsed_df = stream_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# Convert timestamp field
parsed_df = parsed_df.withColumn("event_time", col("timestamp").cast(TimestampType()))

print("✅ Streaming data parsed. Aggregating sales by category...")

# Aggregate sales by product category in 1-minute windows
aggregated_df = parsed_df \
    .withWatermark("event_time", "1 minute") \
    .groupBy(
        window(col("event_time"), "1 minute"),
        col("product_category")
    ).agg(
        spark_sum("amount").alias("total_sales")
    )

print("✅ Setting up streaming query to write directly to parquet...")

# Start the query: write directly to Parquet files
query = aggregated_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/home/jovyan/work/streaming_sales_output/") \
    .option("checkpointLocation", "/home/jovyan/work/streaming_sales_output/checkpoints/") \
    .start()

print("✅ Query started. Awaiting streaming results...")

query.awaitTermination()
