In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, from_json
from pyspark.sql.types import StructType, StringType, IntegerType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("KafkaStreamProcessing") \
    .getOrCreate()

# Kafka Configuration
KAFKA_BROKER = "kafka-broker:9092"  # Replace with actual broker
KAFKA_TOPIC = "input_topic"

# Define Schema for JSON Data
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

# Read Stream from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()

# Extract value as a string and parse JSON
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# Transformation: Filter adults (age > 18)
filtered_df = parsed_df.filter(col("age") > 18)

# Transformation: Add a processing timestamp
processed_df = filtered_df.withColumn("processed_at", expr("current_timestamp()"))

# Write to Delta Table in Append Mode
query = processed_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/kafka_stream") \
    .option("path", "dbfs:/mnt/data/delta_table") \
    .outputMode("append") \
    .start()

query.awaitTermination()
