## Import the libraries


In [1]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip,DeltaTable

## Create a SparkSession object

In [2]:
builder = (SparkSession.builder
           .appName("change-data-capture-delta-table")
           .master("spark://spark-master:7077")
           .config("spark.executor.memory", "512m")
           .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")
           .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
          )

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ca79b277-33ff-4391-be8c-09d8b01c3095;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 73ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   

## Create a Delta table with CDF enabled

In [3]:
sql_query = """
CREATE OR REPLACE TABLE default.movie_and_show_titles_cdf(

show_id STRING,
type STRING,
title STRING,
director STRING,
cast STRING,
country STRING,
date_added STRING,
release_year STRING,
rating STRING,
duration STRING,
listed_in STRING,
description STRING
) USING delta LOCATION '/opt/workspace/data/delta_lake/movie_and_show_titles_cdf'
TBLPROPERTIES (delta.enableChangeDataFeed = true, medallionLevel= 'bronze');

"""

result = spark.sql(sql_query)

                                                                                

## Alternate way- to alter an existing table to enable CDF from that point forward

## Write data into bronze table

### Read csv file into dataframe and then appending it to delta table

In [4]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load("../../data/netflix_titles.csv")
     )

df.write.format("delta").mode("append").saveAsTable("default.movie_and_show_titles_cdf")

                                                                                

## Create a Silver Delta Table

In [5]:
sql_query = """

CREATE OR REPLACE TABLE default.movie_and_show_titles_cleansed (

    show_id STRING,
    type STRING,
    title STRING,
    director STRING,
    cast STRING,
    country STRING,
    date_added STRING,
    release_year STRING,
    rating STRING,
    duration STRING,
    listed_in STRING,
    description STRING

    ) USING DELTA LOCATION '/opt/workspace/data/delta_lake/movie_and_show_titles_cleansed'
    TBLPROPERTIES (delta.enableChangeDataFeed = true,
                   medallionLevel = 'silver',
                   updateFromTable = 'default.movie_and_show_titles_cdf',
                   updatedFromTableVersion= '-1');

"""

result = spark.sql(sql_query)

                    
    

## Get the last updated version from the silver table

In [6]:
lastUpdateVersion = int(spark.sql("SHOW TBLPROPERTIES default.movie_and_show_titles_cleansed('updatedFromTableVersion')").first()["value"]) +1

lastUpdateVersion

0

## Get the latest version from the silver table

In [7]:
latestVersion = spark.sql("DESCRIBE HISTORY default.movie_and_show_titles_cdf").first()["version"]

latestVersion

2

## Create a temporary view 

In [8]:
lastUpdateVersion = int(spark.sql("SHOW TBLPROPERTIES default.movie_and_show_titles_cleansed('updatedFromTableVersion')").first()["value"]) +1

latestVersion = spark.sql("DESCRIBE HISTORY default.movie_and_show_titles_cdf").first()["version"]

# constructing a sql_query using f-string

sql_query = f"""

CREATE OR REPLACE TEMPORARY VIEW bronzeTable_latest_version as 

    SELECT * FROM (
        SELECT *, 
        RANK() OVER(PARTITION BY(lower(type), lower(title), lower(director), date_added) ORDER BY _commit_version DESC) as rank

        FROM table_changes('default.movie_and_show_titles_cdf',{lastUpdateVersion}, {latestVersion})
        
        WHERE type IS NOT NULL AND title IS NOT NULL AND director IS NOT NULL AND _change_type != 'update_preimage'
        )

        WHERE rank = 1;

"""

result = spark.sql(sql_query)

result.show()
    

++
||
++
++



## Merge Change Data into the Silver table

In [9]:
sql_query = """

MERGE INTO default.movie_and_show_titles_cleansed t
USING bronzeTable_latest_version s
ON lower(t.type) = lower(s.type)
AND lower(t.title) = lower(s.title)
AND lower(t.director) = lower(s.director)
AND t.date_added = s.date_added

WHEN MATCHED AND s._change_type= 'update_preimage' OR s._change_type= 'update_postimage' 
THEN UPDATE SET *
WHEN MATCHED AND s._change_type='delete' THEN DELETE
WHEN NOT MATCHED AND s._change_type = 'insert' THEN INSERT *
"""

result = spark.sql(sql_query)
result.show()

                                                                                

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                0|               0|               0|                0|
+-----------------+----------------+----------------+-----------------+



## Alter the delta table

In [10]:
sql_query = f"""
ALTER TABLE default.movie_and_show_titles_cleansed
SET TBLPROPERTIES(updatedFromTableVersion = {latestVersion});
"""

result = spark.sql(sql_query)
result.show()

++
||
++
++



## Drop the temporary view

In [12]:
sql_query = """ 
DROP VIEW bronzeTable_latest_version
"""

spark.sql(sql_query)

DataFrame[]

## Simulate Data changes

In [15]:
# Execute DELETE statement
delete_query = "DELETE FROM default.movie_and_show_titles_cdf WHERE country IS NULL"
delete_result = spark.sql(delete_query)
delete_result.show()  # This typically returns operation metrics

# Execute UPDATE statement
update_query = "UPDATE default.movie_and_show_titles_cdf SET director = '' WHERE director IS NULL"
update_result = spark.sql(update_query)
update_result.show()  # This typically returns operation metrics

+-----------------+
|num_affected_rows|
+-----------------+
|             1660|
+-----------------+

+-----------------+
|num_affected_rows|
+-----------------+
|             4452|
+-----------------+



In [16]:
lastUpdateVersion = int(spark.sql("SHOW TBLPROPERTIES default.movie_and_show_titles_cleansed('updatedFromTableVersion')").first()["value"]) +1

lastUpdateVersion

3

In [17]:
latestVersion = spark.sql("DESCRIBE HISTORY default.movie_and_show_titles_cdf").first()["version"]

latestVersion

4

In [20]:
spark.stop()