In [0]:
Overview:
    
This mini project showcases the usage of Delta Lake's Change Data Feed Feature on a Medallion Architecture.
Medallion architecture:

Bronze: This table has raw data directly from the source
Silver: This table feeds on bronz table and all the cleaning/transforming processes take place in this table
Gold: This is the final table with the master data tweaked and tailored per the business needs and acts as delta lake table.

In [0]:
%sql
Create Table bronze_m
(date string, stock string, analyst INT, estimated_eps Double)
Using Delta
TBLProperties (delta.enableChangeDataFeed = true);

Create Table silve_m
(date string, stock string, analyst INT, estimated_eps Double)
Using Delta
TBLProperties (delta.enableChangeDataFeed = true);

Create Table gold_m
(date string, stock string, concensus_eps Double)
Using Delta
TBLProperties (delta.enableChangeDataFeed = true);

In [0]:
df=spark.createDataFrame(
    [('3/1/2022','a',1,2.2),\
    ('3/2/2022','a',1,2.2),\
    ('3/3/2022','a',2,2.2),\
    ('3/4/2022','a',3,2.2),\
    ('3/5/2022','a',1,2.2),\
    ('3/6/2022','a',1,2.2)],
    ('date','stock','analyst','estimated_eps'))

df.createOrReplaceTempView("Bronze_m_dataset")

In [0]:
%sql
Insert into bronze_m table Bronze_m_dataset

num_affected_rows,num_inserted_rows
6,6


In [0]:
%sql
select * from table_changes('bronze_m',1)

date,stock,analyst,estimated_eps,_change_type,_commit_version,_commit_timestamp
3/3/2022,a,2,2.2,insert,1,2024-02-18T02:07:59.000+0000
3/1/2022,a,1,2.2,insert,1,2024-02-18T02:07:59.000+0000
3/4/2022,a,3,2.2,insert,1,2024-02-18T02:07:59.000+0000
3/6/2022,a,1,2.2,insert,1,2024-02-18T02:07:59.000+0000
3/5/2022,a,1,2.2,insert,1,2024-02-18T02:07:59.000+0000
3/2/2022,a,1,2.2,insert,1,2024-02-18T02:07:59.000+0000


In [0]:
%sql
Insert into silve_m
select date,stock,analyst,estimated_eps
from bronze_m

num_affected_rows,num_inserted_rows
6,6


In [0]:
%sql
Insert into gold_m
select silve_m.date,silve_m.stock,Avg(estimated_eps) as consensus_eps
from silve_m group by silve_m.date,silve_m.stock

num_affected_rows,num_inserted_rows
6,6


In [0]:
%sql
select * from gold_m order by date

date,stock,concensus_eps
3/1/2022,a,2.2
3/2/2022,a,2.2
3/3/2022,a,2.2
3/4/2022,a,2.2
3/5/2022,a,2.2
3/6/2022,a,2.2


In [0]:
df=spark.createDataFrame(
    [('3/1/2022','a',1,2.3),\
    ('3/2/2022','a',1,2.5),\
    ('3/3/2022','a',2,2.7),\
    ('3/4/2022','a',3,2.9),\
    ('3/5/2022','a',1,3.1),\
    ('3/6/2022','a',1,3.3)],
    ('date','stock','analyst','estimated_eps'))

df.createOrReplaceTempView("Bronze_n_dataset")

In [0]:
%sql
Insert into bronze_m table Bronze_n_dataset

num_affected_rows,num_inserted_rows
6,6


In [0]:
%sql
select * from table_changes('silve_m',2) order by date

date,stock,analyst,estimated_eps,_change_type,_commit_version,_commit_timestamp
3/1/2022,a,1,2.2,update_preimage,2,2024-02-18T02:27:47.000+0000
3/1/2022,a,1,2.3,update_postimage,2,2024-02-18T02:27:47.000+0000
3/2/2022,a,1,2.2,update_preimage,2,2024-02-18T02:27:47.000+0000
3/2/2022,a,1,2.5,update_postimage,2,2024-02-18T02:27:47.000+0000
3/3/2022,a,2,2.2,update_preimage,2,2024-02-18T02:27:47.000+0000
3/3/2022,a,2,2.7,update_postimage,2,2024-02-18T02:27:47.000+0000
3/4/2022,a,3,2.2,update_preimage,2,2024-02-18T02:27:47.000+0000
3/4/2022,a,3,2.9,update_postimage,2,2024-02-18T02:27:47.000+0000
3/5/2022,a,1,2.2,update_preimage,2,2024-02-18T02:27:47.000+0000
3/5/2022,a,1,3.1,update_postimage,2,2024-02-18T02:27:47.000+0000


In [0]:
%sql
select * from gold_m order by date

date,stock,concensus_eps
3/1/2022,a,2.2
3/2/2022,a,2.2
3/3/2022,a,2.2
3/4/2022,a,2.2
3/5/2022,a,2.2
3/6/2022,a,2.2


In [0]:
%sql
Merge into silve_m
using
(select * from table_changes('bronze_m',3)) as silver_cdf
on silver_cdf.date = silve_m.date
When matched then
 update set silve_m.estimated_eps = silver_cdf.estimated_eps
When not matched
 then insert (date, stock, analyst,estimated_eps) values (date, stock, analyst, estimated_eps)


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
6,6,0,0


In [0]:
%sql
Merge into gold_m
using
(select silve_m.date,silve_m.stock,Avg(estimated_eps) as concensus_eps
from silve_m
Inner join (select distinct date, stock from table_changes('silve_m',2)) as silver_cdf
on silver_cdf.date = silve_m.date
group by silve_m.date,silve_m.stock) as silver_cdf2
on silver_cdf2.date = gold_m.date 
When matched then
 update set gold_m.concensus_eps = silver_cdf2.concensus_eps
When not matched
 then insert (date, stock, concensus_eps) values (date, stock, concensus_eps)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
6,6,0,0


In [0]:
%sql
select * from gold_m order by date

date,stock,concensus_eps
3/1/2022,a,2.3
3/2/2022,a,2.5
3/3/2022,a,2.7
3/4/2022,a,2.9
3/5/2022,a,3.1
3/6/2022,a,3.3
