In [0]:
## Following cells illustrate the medallion architecture , its an architecture that suits Retail grocery store
#task 1 # Supply config through a config file azure datalake store gen 2 to repeat the task for shipment details as well.
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Kafka details (HDInsight)
KAFKA_BROKERS = "kafka-broker1:9092,kafka-broker2:9092"
KAFKA_TOPIC = "inventory_events"
KAFKA_SECURITY_PROTOCOL = "SASL_SSL"

# Read streaming data from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKERS) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol", KAFKA_SECURITY_PROTOCOL) \
    .load()

# Define schema for inventory data
shipment_summary_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("store_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("purchase_order", IntegerType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("supplier_id", StringType(), True)
])

# Parse JSON messages
bronze_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), shipment_summary_schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_time", to_timestamp("timestamp"))

# Write to Delta Table (Bronze Layer) in ADLS Gen2
bronze_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://bronze@storage.dfs.core.windows.net/checkpoints/") \
    .option("path", "abfss://bronze@storage.dfs.core.windows.net/shipment_summary/") \
    .partitionBy("store_id") \
    .outputMode("append") \
    .start()