## Delta Straming End to End Project For Data Engineers

In [0]:
from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import*
spark=SparkSession.builder.appName('vijaquick').getOrCreate()

In [0]:
%sql
create table product(
  product_id int,
  product_name varchar(100),
  price int
) using delta
location "dbfs:/FileStore/product_list"

In [0]:
%sql
insert into product
values
(103,"labtop",80000),
(102,"mobile",50000)

num_affected_rows,num_inserted_rows
2,2


In [0]:
%sql
update product
set product_name="cookies" where product_id=105

num_affected_rows
1


In [0]:
%sql
alter table delta.`dbfs:/FileStore/product_list`
set TBLPROPERTIES (delta.enableChangeDataFeed = true)

In [0]:
df_read_stream=spark.readStream\
    .format("delta")\
    .option("readChangeFeed","true")\
        .load("dbfs:/FileStore/product_list")

In [0]:
from delta.tables import *
def merge_with_data(microBatchDF, batchId):
    df_filter_stream = microBatchDF.filter(col("_change_type").isin("insert", "update_postimage"))
    delta_table=DeltaTable.forPath(spark,"dbfs:/FileStore/product_list_output2")
    delta_table.alias("tar").merge(df_filter_stream.alias("src"),"tar.product_id == src.product_id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

In [0]:
df_read_stream.writeStream.outputMode("append")\
    .option("checkpointLocation", "dbfs:/FileStore/checkpoint1") \
    .option("path","dbfs:/FileStore/product_list_output2")\
    .trigger(processingTime="2 seconds") \
    .start()

Out[279]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f5874438a60>

In [0]:

df_read_stream.writeStream.outputMode("update").foreachBatch(merge_with_data)\
        .option("checkpointLocation", "dbfs:/FileStore/checkpoint1") \
        .trigger(processingTime="2 seconds") \
        .start()

   

Out[283]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f5876d22e20>

In [0]:
spark.read.format("delta").load('dbfs:/FileStore/product_list_output2').display()

product_id,product_name,price,_change_type,_commit_version,_commit_timestamp
103,labtop,80000,insert,2,2025-08-03T09:18:10.000+0000
102,mobile,50000,insert,2,2025-08-03T09:18:10.000+0000
105,cookies,7000,update_postimage,4,2025-08-03T09:19:46.000+0000


In [0]:
%sql
   DESCRIBE HISTORY product;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2025-08-03T05:51:57.000+0000,6284438897623789,vigneshsiva3699@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1271205243871124),0803-042851-7q1ebrfr,3.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1119)",,Databricks-Runtime/12.2.x-scala2.12
3,2025-08-03T05:45:58.000+0000,6284438897623789,vigneshsiva3699@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1271205243871124),0803-042851-7q1ebrfr,2.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1119)",,Databricks-Runtime/12.2.x-scala2.12
2,2025-08-03T05:45:20.000+0000,6284438897623789,vigneshsiva3699@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1271205243871124),0803-042851-7q1ebrfr,1.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 1137)",,Databricks-Runtime/12.2.x-scala2.12
1,2025-08-03T05:31:56.000+0000,6284438897623789,vigneshsiva3699@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1271205243871124),0803-042851-7q1ebrfr,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 1143)",,Databricks-Runtime/12.2.x-scala2.12
0,2025-08-03T05:30:18.000+0000,6284438897623789,vigneshsiva3699@gmail.com,CREATE TABLE,"Map(isManaged -> false, description -> null, partitionBy -> [], properties -> {})",,List(1271205243871124),0803-042851-7q1ebrfr,,WriteSerializable,True,Map(),,Databricks-Runtime/12.2.x-scala2.12


In [0]:
dbutils.fs.rm("dbfs:/FileStore/product_list", recurse = True)
# dbutils.fs.rm("dbfs:/FileStore/product_list_output2", recurse = True)
dbutils.fs.rm("dbfs:/FileStore/product_logdetails", recurse = True)

Out[273]: False

In [0]:
from pyspark.sql.functions import col
from delta.tables import DeltaTable
from pyspark.sql.window import*

# Read stream from Delta table with Change Data Feed enabled
df_read_stream = spark.readStream \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .load("dbfs:/FileStore/product_list")

# Define merge logic inside foreachBatch
def merge_with_data(microBatchDF, batchId):

    # Only keep rows of type insert or update_postimage
    df_filter_stream = microBatchDF.filter(
        col("_change_type").isin("insert", "update_postimage")
    ).filter(col("product_id").isNotNull())

    # Deduplicate by product_id, keeping latest modified row
    window_spec = Window.partitionBy("product_id").orderBy(col("_commit_timestamp").desc())

    df_dedup = df_filter_stream.withColumn("row_num", row_number().over(window_spec)) \
                               .filter(col("row_num") == 1) \
                               .drop("row_num")

    delta_table = DeltaTable.forPath(spark, "dbfs:/FileStore/product_list_output2")

    delta_table.alias("tar") \
        .merge(
            df_dedup.alias("src"),
            "tar.product_id = src.product_id"
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
# Write stream to Delta using foreachBatch
df_read_stream.writeStream.outputMode("update").foreachBatch(merge_with_data)\
    .option("checkpointLocation", "dbfs:/FileStore/checkpoint1") \
    .trigger(processingTime="2 seconds") \
    .start()


Out[233]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f5876d1ec40>