#### Print Environment Variables

In [1]:
import os
print(os.environ.get("SPARK_HOME"))
print(os.environ.get("KAFKA_HOME"))
print(os.environ.get("HADOOP_HOME"))
print(os.environ.get("JAVA_HOME"))

C:\spark\spark-3.4.4-bin-hadoop3-scala2.13
C:\kafka\kafka_2.13-3.9.0
C:\hadoop-3.3.6
C:\Program Files\Java\jdk-1.8


#### Spark Session with Kafka Support

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("KafkaRetailConsumer")
    .master("local[*]")
    .getOrCreate()
)


print("Spark Version:", spark.version)
spark.stop()


Spark Version: 3.4.4


In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("KafkaRetailConsumer")
    .master("local[*]")
    # .config("spark.sql.shuffle.partitions", "2")
    # .config("spark.sql.streaming.schemaInference", "true")
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.4,"
            "org.apache.spark:spark-token-provider-kafka-0-10_2.13:3.4.4,"
            "org.apache.kafka:kafka-clients:3.5.1,"
            "org.apache.commons:commons-pool2:2.11.1,"
            "io.delta:delta-spark_2.13:2.4.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)


#### Read from Kafka Topics

In [4]:
df_raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "retail_orders_us,retail_orders_in,retail_orders_others")
    .option("startingOffsets", "earliest")  # "latest" if you only want new ones
    .load()
)

#### Parse The JSON Messages

In [5]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType, FloatType, TimestampType

schema = StructType() \
    .add("order_id", StringType()) \
    .add("product_id", StringType()) \
    .add("quantity", StringType()) \
    .add("price", StringType()) \
    .add("country", StringType()) \
    .add("channel", StringType()) \
    .add("timestamp", StringType())

df_parsed = (
    df_raw
    .selectExpr("CAST(value AS STRING)", "topic", "partition", "offset")
    .withColumn("jsonData", from_json(col("value"), schema))
    .select(
        "topic", "partition", "offset",
        col("jsonData.*"))
    .withColumn("quantity", col("quantity").cast("int"))
    .withColumn("price", col("price").cast("float"))

)


#### Write Stream to Delta - Partitioned by Country

In [None]:
from pyspark.sql.functions import col

def debug_batch(df, epoch_id):
    if df.count() == 0:
        print(f"---- Batch {epoch_id} is empty ----")
        return

    print(f"---- Processing Batch {epoch_id} ----")
    df.show(truncate=False)

    (
        df.write
        .format("delta")
        .mode("append")
        .partitionBy("country")
        .save("delta/orders_by_country")
    )

    print(f"---- Batch {epoch_id} written to Delta ----")

# Start the streaming query
query = (
    df_parsed.writeStream
    .outputMode("append")
    .foreachBatch(debug_batch)
    .option("checkpointLocation", "delta/checkpoints/orders_by_country_batch")
    .start()
)

query.awaitTermination(120)


---- Processing Batch 0 ----
+----------------+---------+------+--------+----------+--------+------+-------+-------+--------------------+
|topic           |partition|offset|order_id|product_id|quantity|price |country|channel|timestamp           |
+----------------+---------+------+--------+----------+--------+------+-------+-------+--------------------+
|retail_orders_us|0        |0     |ORD001  |P106      |2       |179.01|US     |mobile |2025-04-07T10:00:00Z|
|retail_orders_us|0        |1     |ORD004  |P108      |3       |129.7 |US     |online |2025-04-07T10:00:45Z|
|retail_orders_us|0        |2     |ORD007  |P125      |1       |130.6 |US     |mobile |2025-04-07T10:01:30Z|
|retail_orders_us|0        |3     |ORD009  |P141      |3       |54.91 |US     |store  |2025-04-07T10:02:00Z|
|retail_orders_us|0        |4     |ORD013  |P111      |5       |144.08|US     |online |2025-04-07T10:03:00Z|
|retail_orders_us|0        |5     |ORD014  |P149      |4       |131.83|US     |mobile |2025-04-07T1

In [None]:
# Code for testing the parsed dataframe
df_parsed.writeStream.format("console").start().awaitTermination(10)
query.stop()
df_parsed.printSchema()

False

#### Query the Delta Tables

In [6]:
df_all = spark.read.format("delta").load("delta/orders_by_country")
df_all.createOrReplaceTempView("orders")

spark.sql("SELECT country, COUNT(*) FROM orders GROUP BY country").show()


+-------+--------+
|country|count(1)|
+-------+--------+
|     GE|      22|
|     UK|      20|
|     US|      18|
|     IN|      16|
|     FR|      13|
|     CA|      11|
+-------+--------+



In [7]:
spark.sql("SELECT * FROM orders").show()

+--------------------+---------+------+--------+----------+--------+------+-------+-------+--------------------+
|               topic|partition|offset|order_id|product_id|quantity| price|country|channel|           timestamp|
+--------------------+---------+------+--------+----------+--------+------+-------+-------+--------------------+
|retail_orders_others|        1|     1|  ORD006|      P130|       3| 62.13|     GE| online|2025-04-07T10:01:15Z|
|retail_orders_others|        1|     3|  ORD010|      P109|       2| 61.81|     GE| online|2025-04-07T10:02:15Z|
|retail_orders_others|        1|     6|  ORD017|      P120|       5|198.89|     GE| online|2025-04-07T10:04:00Z|
|retail_orders_others|        1|     7|  ORD018|      P137|       2|141.92|     GE| mobile|2025-04-07T10:04:15Z|
|retail_orders_others|        1|     8|  ORD019|      P120|       1| 68.73|     GE|  store|2025-04-07T10:04:30Z|
|retail_orders_others|        1|     9|  ORD023|      P108|       2|135.57|     GE| online|2025-

#### Dynamic Table Creation for "Channel" context

In [9]:
from pyspark.sql.functions import col

def write_per_channel(batch_df, batch_id):
    print(f"\n Processing Batch: {batch_id}")
    
    channels = [row['channel'] for row in batch_df.select("channel").distinct().collect()]
    
    for channel in channels:
        channel_df = batch_df.filter(col("channel") == channel)
        output_path = f"delta/dynamic_orders/{channel}"
        
        print(f"Writing data for channel: {channel} -> {output_path}")
        
        (
            channel_df.write
            .format("delta")
            .mode("append")
            .option("mergeSchema", "true")
            .save(output_path)
        )

query = (
    df_parsed.writeStream
    .outputMode("append")
    .foreachBatch(write_per_channel)
    .option("checkpointLocation", "delta/checkpoints/dynamic_orders_by_channel")
    .start()
)

query.awaitTermination(120)




 Processing Batch: 0
Writing data for channel: online -> delta/dynamic_orders/online
Writing data for channel: mobile -> delta/dynamic_orders/mobile
Writing data for channel: store -> delta/dynamic_orders/store


False

In [14]:
from pyspark.sql import DataFrame

# Load all individual channel tables
df_online = spark.read.format("delta").load("delta/dynamic_orders/online")
df_store = spark.read.format("delta").load("delta/dynamic_orders/store")
df_mobile = spark.read.format("delta").load("delta/dynamic_orders/mobile")

# Union all into a single DataFrame
df_all = df_online.unionByName(df_store).unionByName(df_mobile)
df_all.createOrReplaceTempView("all_orders")

spark.sql("SELECT * FROM orders where price > 100").show()

+--------------------+---------+------+--------+----------+--------+------+-------+-------+--------------------+
|               topic|partition|offset|order_id|product_id|quantity| price|country|channel|           timestamp|
+--------------------+---------+------+--------+----------+--------+------+-------+-------+--------------------+
|retail_orders_others|        1|     6|  ORD017|      P120|       5|198.89|     GE| online|2025-04-07T10:04:00Z|
|retail_orders_others|        1|     7|  ORD018|      P137|       2|141.92|     GE| mobile|2025-04-07T10:04:15Z|
|retail_orders_others|        1|     9|  ORD023|      P108|       2|135.57|     GE| online|2025-04-07T10:05:30Z|
|retail_orders_others|        1|    10|  ORD030|      P109|       5|192.76|     GE| online|2025-04-07T10:07:15Z|
|retail_orders_others|        1|    19|  ORD050|      P134|       5|152.43|     GE|  store|2025-04-07T10:12:15Z|
|retail_orders_others|        1|    22|  ORD057|      P105|       1|185.78|     GE| mobile|2025-