In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
    .master("spark://46e3418f91ed:7077") \
    .appName("Iceberg-Spark-Session") \
    .getOrCreate()

spark

24/11/04 17:01:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
df = spark.read.csv("/home/iceberg/warehouse/Customers.csv", header=True, inferSchema=True)
df = df.limit(100)

In [4]:
spark.sql("drop table iceberg.target_customers")

DataFrame[]

In [5]:
spark.sql("drop database iceberg")

DataFrame[]

In [6]:
spark.sql("show databases");

In [3]:
#write our source table
df.write.saveAsTable("iceberg.source_customers", mode="overwrite")

                                                                                

In [4]:
#write dest table
df.write.saveAsTable("iceberg.target_customers")

In [7]:
spark.sql("select * from iceberg.source_customers")

DataFrame[CustomerID: int, Gender: string, Age: int, Annual Income ($): int, Spending Score (1-100): int, Profession: string, Work Experience: int, Family Size: int]

In [None]:
%%sql
select * from iceberg.target_customers

In [9]:
%%sql
-- insert into the source
insert into iceberg.source_customers values(0,"Male", 23,13000,84,"Lawyer",4,7);

In [10]:
%%sql
-- update the source 
update iceberg.source_customers set Profession = "None" where CustomerID = 1; 

In [11]:
%%sql
-- delete from the source
delete from iceberg.source_customers where CustomerID = 2;

In [12]:
%%sql
--create CTE to capture any changes, updates deletes inserts at the source table 
with cdc as(     
SELECT COALESCE(s.CustomerID, t.CustomerID) AS CustomerID,
       s.Gender AS Gender,
       s.Age AS Age,
       s.`Annual Income ($)` AS `Annual Income ($)`,
       s.`Spending Score (1-100)` AS `Spending Score (1-100)`,
       s.Profession AS Profession,
       s.`Work Experience` AS `Work Experience`,
       s.`Family Size` AS `Family Size`,
       CASE
            WHEN s.CustomerID IS NULL THEN 'D'
            WHEN t.CustomerID IS NULL THEN 'I'
            ELSE 'U' 
       END AS CDC_flag
FROM 
    iceberg.source_customers AS s 
FULL OUTER JOIN 
    iceberg.target_customers AS t
ON 
    s.CustomerID = t.CustomerID
WHERE( s.CustomerID is null or
    
       t.CustomerID is null or    
    
    NOT (
           s.Gender = t.Gender AND
           s.Age = t.Age AND
           s.`Annual Income ($)` = t.`Annual Income ($)` AND
           s.`Spending Score (1-100)` = t.`Spending Score (1-100)` AND
           s.Profession = t.Profession AND
           s.`Work Experience` = t.`Work Experience` AND
           s.`Family Size` = t.`Family Size`
          )    
    )
    )
select * from cdc

CustomerID,Gender,Age,Annual Income ($),Spending Score (1-100),Profession,Work Experience,Family Size,CDC_flag
0,Male,23.0,13000.0,84.0,Lawyer,4.0,7.0,I
1,Male,25.0,15000.0,39.0,,,,U
2,,,,,,,,D


In [13]:
%%sql
-- now use this cte to update the dest table with any changes at the source     
with cdc as(     
SELECT COALESCE(s.CustomerID, t.CustomerID) AS CustomerID,
       s.Gender AS Gender,
       s.Age AS Age,
       s.`Annual Income ($)` AS `Annual Income ($)`,
       s.`Spending Score (1-100)` AS `Spending Score (1-100)`,
       s.Profession AS Profession,
       s.`Work Experience` AS `Work Experience`,
       s.`Family Size` AS `Family Size`,
       CASE
            WHEN s.CustomerID IS NULL THEN 'D'
            WHEN t.CustomerID IS NULL THEN 'I'
            ELSE 'U' 
       END AS CDC_flag
FROM 
    iceberg.source_customers AS s 
FULL OUTER JOIN 
    iceberg.target_customers AS t
ON 
    s.CustomerID = t.CustomerID
WHERE( s.CustomerID is null or
    
       t.CustomerID is null or    
    
    NOT (
           s.Gender = t.Gender AND
           s.Age = t.Age AND
           s.`Annual Income ($)` = t.`Annual Income ($)` AND
           s.`Spending Score (1-100)` = t.`Spending Score (1-100)` AND
           s.Profession = t.Profession AND
           s.`Work Experience` = t.`Work Experience` AND
           s.`Family Size` = t.`Family Size`
          )    
    )
    )
    
MERGE into iceberg.target_customers 
using cdc 
on iceberg.target_customers.CustomerID = cdc.CustomerID 
WHEN MATCHED AND cdc.CDC_flag = 'D' THEN DELETE
WHEN MATCHED AND cdc.CDC_flag = 'U' THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

In [14]:
%%sql
select count(*) 
from 
    iceberg.source_customers

count(1)
100


In [15]:
%%sql
-- the insertion occurs at the source now is been mergrd into the dest table
select count(*) 
from 
    iceberg.target_customers

count(1)
100


In [16]:
%%sql 
--we can see that the target table now have a new version    
select * from iceberg.target_customers.history

made_current_at,snapshot_id,parent_id,is_current_ancestor
2024-07-02 18:41:16.500000,1629686733036594320,,True
2024-07-02 18:42:48.565000,4068554996449296170,1.6296867330365944e+18,True
