In [3]:
#config to enable all new Delta tables with Change Data Feed
spark.conf.set("spark.microsoft.delta.properties.defaults.enableChangeDataFeed", "true")

StatementMeta(, , , Waiting, )

In [4]:
#import data types
from pyspark.sql.types import *
from datetime import datetime

#create schema
table_schema = StructType([
                    StructField('OrderID', IntegerType(), True),
                    StructField('ProductName', StringType(), True),
                    StructField('ItemPrice', IntegerType(), True),
                    StructField('OrderTotal', IntegerType(), True),
                    StructField('OrderDate', DateType(), True)])

#load rows
staged_rows = [(1,'Soft Toy',10, 35,datetime(2023, 11, 20)),
            (2,'Mobile Phone',450, 10,datetime(2023, 11, 20)),
            (3,"Notepad",5,125,datetime(2023, 11, 20))]

#create dataframe and append current datetime
staged_df = spark.createDataFrame(staged_rows,table_schema) \
            .write.mode("overwrite").format("delta").save("Tables/rawproductsales")


StatementMeta(, , , Waiting, )

In [None]:
#read data
df = spark.read.format("delta").table("rawproductsales")

display(df)

In [6]:
#add new order
new_order = [(4,'TV',2, 750,datetime(2023, 11, 21))]
spark.createDataFrame(data=new_order, schema = table_schema).write.format("delta").mode("append").saveAsTable("rawproductsales")

StatementMeta(, , , Waiting, )

In [9]:
#update existing order
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, 'Tables/rawproductsales')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "OrderID = 1",
  set = { "ItemPrice": "9", "OrderTotal": "38"  }
)

StatementMeta(, , , Waiting, )

In [11]:
#delete existing order
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, 'Tables/rawproductsales')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("OrderID = 2")

StatementMeta(, , , Waiting, )

In [None]:
#read latest data
df = spark.read.format("delta").table("rawproductsales")

display(df)

In [None]:
#read change data feed
changedatefeed_df = spark.read.format("delta") \
    .option("readChangeData", True) \
    .option("startingVersion", 0) \
    .table('rawproductsales')

display(changedatefeed_df.sort("_commit_version"))

In [None]:
%%sql
--get the updates only
SELECT *
FROM table_changes('rawproductsales', 1)
WHERE _change_type ='update_postimage'

In [None]:
%%sql
--get the deletes 
SELECT *
FROM table_changes('rawproductsales', 1)
WHERE _change_type ='delete'


In [None]:
%%sql
--get the inserts 
SELECT *
FROM table_changes('rawproductsales', 1)
WHERE _change_type ='insert'

In [None]:
#add new order
new_order = [(5,'Laptop',699, 2,datetime(2023, 11, 22))]

#add new column for discount percentage
df = spark.createDataFrame(data=new_order, schema=table_schema) \
    .withColumn("DiscountPercent",lit(10))

#write to table
df.write.mode("append").format("delta") \
    .option("mergeSchema", "true") \
    .save("Tables/rawproductsales")


In [None]:
#read change data feed again
changedatefeed_df = spark.read.format("delta") \
    .option("readChangeData", True) \
    .option("startingVersion", 0) \
    .table('rawproductsales')
display(changedatefeed_df.sort("_commit_version"))

In [37]:
#set aggressive vacuum
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")


StatementMeta(, , , Waiting, )

In [46]:
#vacuum table and remove all commits except latest
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 'Tables/rawproductsales')

deltaTable.vacuum(0)

StatementMeta(, , , Waiting, )

DataFrame[]

In [None]:
#display latest (and only) commit and changes using startingVersion
changes_df = spark.read.format("delta") \
    .option("readChangeData", True) \
    .option("startingVersion", 4) \
    .table('rawproductsales')

display(changes_df.sort("_commit_version"))

In [None]:
#display latest (and only) commit and changes using readChangeData
changes_df = spark.read.format("delta") \
    .option("readChangeData", True) \
    .option("startingTimestamp", '2023-11-27 19:00:37.195') \
    .table('rawproductsales')

display(changes_df.sort("_commit_version"))