# Delta Lake Change Data Feed

In [0]:
%sql

DROP TABLE gold_product_dt;
DROP TABLE silver_product_dt;
DROP VIEW silver_product_dt_last_version;

## Silvet table product

In [0]:
data_product = [
  (1,"mobile",1000, 10, 7),
  (2,"smartphone",1400, 5, 8),
  (3,"cellphone",1200, 7, 10),
  (4,"laptop",2250, 2, 27),
  (5,"notebook",3150, 14, 29),
  (6,"pc",3000, 8, 25),
  (7,"netbook",4500, 2, 27)
]

schema_product = "id INTEGER, device STRING, price INTEGER, discount INTEGER, size INTEGER"

In [0]:
df_product = spark.createDataFrame(data = data_product, schema = schema_product)

df_product.write.format("delta").mode("overwrite").saveAsTable("silver_product_dt")

In [0]:
%sql

SELECT * FROM silver_product_dt;

id,device,price,discount,size
2,smartphone,1400,5,8
3,cellphone,1200,7,10
5,notebook,3150,14,29
7,netbook,4500,2,27
1,mobile,1000,10,7
4,laptop,2250,2,27
6,pc,3000,8,25


## Gold table product

In [0]:
from pyspark.sql.functions import col, when, round

In [0]:
df_product.select("*",
                (round(col("price") - (col("price") * col("discount")/100),2)).alias("final_price"),
                (when(col("size") >= 10, False).otherwise(True)).alias("free_shipping")
                ).drop("price","discount").write.format("delta").mode("overwrite").saveAsTable("gold_product_dt")

In [0]:
%sql

SELECT * FROM gold_product_dt;

id,device,size,final_price,free_shipping
2,smartphone,8,1330.0,True
3,cellphone,10,1116.0,False
5,notebook,29,2709.0,False
7,netbook,27,4410.0,False
1,mobile,7,900.0,True
4,laptop,27,2205.0,False
6,pc,25,2760.0,False


## Enable change data feed silver table

In [0]:
%sql

ALTER TABLE silver_product_dt SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

## Insert, update, delete silvet table

In [0]:
data_product_v2 = [
  (8,"e-bike",7000, 2, 50),
  (9,"mp5",300, 10, 3)
]

In [0]:
spark.createDataFrame(data = data_product_v2, schema = schema_product).write.format("delta").mode("append").saveAsTable("silver_product_dt")

In [0]:
%sql

UPDATE silver_product_dt SET size = 9 WHERE id = 7;

UPDATE silver_product_dt SET price = 1700 WHERE id = 1;

num_affected_rows
1


In [0]:
%sql

DELETE FROM silver_product_dt WHERE id = 5;

num_affected_rows
1


In [0]:
%sql

SELECT * FROM silver_product_dt

id,device,price,discount,size
7,netbook,4500,2,9
1,mobile,1700,10,7
2,smartphone,1400,5,8
3,cellphone,1200,7,10
4,laptop,2250,2,27
8,e-bike,7000,2,50
9,mp5,300,10,3
6,pc,3000,8,25


## Explore change data silver table

In [0]:
%sql

DESCRIBE HISTORY silver_product_dt;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
5,2024-09-01T17:32:20.000+0000,8904480319608266,andresmunozpampillonaws@gmail.com,DELETE,"Map(predicate -> [""(id#28774 = 5)""])",,List(1305275586066613),0901-170612-2jv4zi0w,4.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 1529, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 1, executionTimeMs -> 1465, numDeletedRows -> 1, scanTimeMs -> 824, numAddedFiles -> 0, numAddedBytes -> 0, rewriteTimeMs -> 641)",,Databricks-Runtime/12.2.x-scala2.12
4,2024-09-01T17:32:16.000+0000,8904480319608266,andresmunozpampillonaws@gmail.com,UPDATE,"Map(predicate -> [""(id#28094 = 1)""])",,List(1305275586066613),0901-170612-2jv4zi0w,3.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 1516, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 1, executionTimeMs -> 2501, scanTimeMs -> 1327, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 1733, rewriteTimeMs -> 1174)",,Databricks-Runtime/12.2.x-scala2.12
3,2024-09-01T17:32:11.000+0000,8904480319608266,andresmunozpampillonaws@gmail.com,UPDATE,"Map(predicate -> [""(id#27417 = 7)""])",,List(1305275586066613),0901-170612-2jv4zi0w,2.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 1523, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 1, executionTimeMs -> 2384, scanTimeMs -> 1291, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 1740, rewriteTimeMs -> 1093)",,Databricks-Runtime/12.2.x-scala2.12
2,2024-09-01T17:32:06.000+0000,8904480319608266,andresmunozpampillonaws@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1305275586066613),0901-170612-2jv4zi0w,1.0,WriteSerializable,True,"Map(numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 3010)",,Databricks-Runtime/12.2.x-scala2.12
1,2024-09-01T17:32:01.000+0000,8904480319608266,andresmunozpampillonaws@gmail.com,SET TBLPROPERTIES,"Map(properties -> {""delta.enableChangeDataFeed"":""true""})",,List(1305275586066613),0901-170612-2jv4zi0w,0.0,WriteSerializable,True,Map(),,Databricks-Runtime/12.2.x-scala2.12
0,2024-09-01T17:31:48.000+0000,8904480319608266,andresmunozpampillonaws@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(1305275586066613),0901-170612-2jv4zi0w,,WriteSerializable,False,"Map(numFiles -> 7, numOutputRows -> 7, numOutputBytes -> 10652)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
%sql

SELECT * FROM table_changes('silver_product_dt', 1) ORDER BY _commit_timestamp

id,device,price,discount,size,_change_type,_commit_version,_commit_timestamp
8,e-bike,7000,2,50,insert,2,2024-09-01T17:32:06.000+0000
9,mp5,300,10,3,insert,2,2024-09-01T17:32:06.000+0000
7,netbook,4500,2,27,update_preimage,3,2024-09-01T17:32:11.000+0000
7,netbook,4500,2,9,update_postimage,3,2024-09-01T17:32:11.000+0000
1,mobile,1000,10,7,update_preimage,4,2024-09-01T17:32:16.000+0000
1,mobile,1700,10,7,update_postimage,4,2024-09-01T17:32:16.000+0000
5,notebook,3150,14,29,delete,5,2024-09-01T17:32:20.000+0000


In [0]:
spark.read.format("delta").option("readChangeData", True).option("startingVersion", 2).table("silver_product_dt").show(truncate=False)

+---+--------+-----+--------+----+----------------+---------------+-------------------+
|id |device  |price|discount|size|_change_type    |_commit_version|_commit_timestamp  |
+---+--------+-----+--------+----+----------------+---------------+-------------------+
|7  |netbook |4500 |2       |27  |update_preimage |3              |2024-09-01 17:32:11|
|7  |netbook |4500 |2       |9   |update_postimage|3              |2024-09-01 17:32:11|
|1  |mobile  |1000 |10      |7   |update_preimage |4              |2024-09-01 17:32:16|
|1  |mobile  |1700 |10      |7   |update_postimage|4              |2024-09-01 17:32:16|
|5  |notebook|3150 |14      |29  |delete          |5              |2024-09-01 17:32:20|
|8  |e-bike  |7000 |2       |50  |insert          |2              |2024-09-01 17:32:06|
|9  |mp5     |300  |10      |3   |insert          |2              |2024-09-01 17:32:06|
+---+--------+-----+--------+----+----------------+---------------+-------------------+



## Propagate changes from silver to gold table

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW silver_product_dt_last_version AS
  SELECT *
  FROM (
      SELECT * , RANK() OVER (PARTITION BY id, device ORDER BY _commit_version DESC) AS rank
      FROM table_changes('silver_product_dt', 1)
      WHERE _change_type != "update_preimage"
    )
    WHERE rank = 1

In [0]:
%sql

SELECT * FROM silver_product_dt_last_version;

id,device,price,discount,size,_change_type,_commit_version,_commit_timestamp,rank
1,mobile,1700,10,7,update_postimage,4,2024-09-01T17:32:16.000+0000,1
5,notebook,3150,14,29,delete,5,2024-09-01T17:32:20.000+0000,1
7,netbook,4500,2,9,update_postimage,3,2024-09-01T17:32:11.000+0000,1
8,e-bike,7000,2,50,insert,2,2024-09-01T17:32:06.000+0000,1
9,mp5,300,10,3,insert,2,2024-09-01T17:32:06.000+0000,1


In [0]:
%sql

MERGE INTO gold_product_dt AS g USING silver_product_dt_last_version AS s
  ON g.id = s.id AND g.device = s.device
  WHEN MATCHED AND s._change_type = 'update_postimage' THEN
    UPDATE SET
      size = s.size,
      final_price = ROUND(s.price - (s.price * s.discount / 100), 2),
      free_shipping = CASE WHEN s.size >= 10 THEN False ELSE True END
  WHEN MATCHED AND s._change_type = 'delete' THEN
    DELETE
  WHEN NOT MATCHED THEN
    INSERT (id, device, size, final_price, free_shipping) VALUES (s.id, s.device, s.size, ROUND(s.price - (s.price * s.discount / 100), 2),CASE WHEN s.size >= 10 THEN False ELSE True END)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
5,2,1,2


In [0]:
%sql

SELECT * FROM gold_product_dt;

id,device,size,final_price,free_shipping
1,mobile,7,1530.0,True
7,netbook,9,4410.0,True
8,e-bike,50,6860.0,False
9,mp5,3,270.0,True
2,smartphone,8,1330.0,True
3,cellphone,10,1116.0,False
4,laptop,27,2205.0,False
6,pc,25,2760.0,False
