# Kafka Datagen + Spark Demo

This notebook shows how to:
1. Consume messages from the Kafka topic `orders` using `confluent-kafka`.
2. Read the same topic as a structured stream in PySpark.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

# Start Spark session
spark = SparkSession.builder \
    .appName("KafkaStructuredStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Read stream from Kafka
df = (spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:29092")
      .option("subscribe", "orders")
      .option("startingOffsets", "latest")
      .load())

# Kafka data comes in key and value as bytes, convert to string
df_str = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Define schema for the JSON payload
order_schema = StructType([
    StructField("ordertime", LongType(), True),
    StructField("orderid", LongType(), True),
    StructField("itemid", StringType(), True),
    StructField("orderunits", DoubleType(), True),
    StructField("address", StructType([
        StructField("city", StringType(), True),
        StructField("state", StringType(), True),
        StructField("zipcode", LongType(), True)
    ]), True)
])

# Parse JSON string into columns
df_parsed = df_str.withColumn("jsonData", from_json(col("value"), order_schema)) \
                  .select("jsonData.*")

# Convert ordertime (epoch ms) to timestamp
df_with_ts = df_parsed.withColumn(
    "order_timestamp",
    to_timestamp((col("ordertime") / 1000).cast("timestamp"))
)

# Write stream to console (for testing)
query = (df_with_ts.writeStream
         .format("console")
         .outputMode("append")
         .option("truncate", False)
         .start())

query.awaitTermination()

+-------------+-------+--------+------------------+-------------------------+-----------------------+
|ordertime    |orderid|itemid  |orderunits        |address                  |order_timestamp        |
+-------------+-------+--------+------------------+-------------------------+-----------------------+
|1490212132747|412792 |Item_581|1.1873091771586861|{City_37, State_1, 86136}|2017-03-22 19:48:52.747|
|1501532059654|412793 |Item_785|3.834913005148235 |{City_18, State_, 55683} |2017-07-31 20:14:19.654|
+-------------+-------+--------+------------------+-------------------------+-----------------------+

-------------------------------------------
Batch: 198
-------------------------------------------
+-------------+-------+------+-----------------+-----------------------+-----------------------+
|ordertime    |orderid|itemid|orderunits       |address                |order_timestamp        |
+-------------+-------+------+-----------------+-----------------------+----------------------