# Setup

In [0]:
# Clean prior run data files
dbutils.fs.rm('/tmp/ch-6/', True)

# Drop & recreate database
spark.sql("DROP DATABASE IF EXISTS ch_6 CASCADE")
spark.sql("CREATE DATABASE ch_6 ")
spark.sql("USE ch_6")

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([ \
    StructField("year", IntegerType(),True), \
    StructField("month",IntegerType(),True), \
    StructField("day",IntegerType(),True), \
    StructField("customer", StringType(), True), \
    StructField("sale_id", StringType(), True), \
    StructField("total_cost", FloatType(), True) \
  ])
strutured_data_1 = [(2021, 3,2, "Jim Smith","M12SD", 30.37),
             (2021, 6,2, "Jill King","K127D", 50.5),
             (2021, 1,2, "Jack Morris","12PSD",100.0)
  ]
df = spark.createDataFrame(data=strutured_data_1,schema=schema)

delta_path = '/tmp/ch-6/delta_data'
df.write.format("delta").mode('overwrite').save(delta_path)

strutured_data_2 = [(2022, 1,1, "Jim Smith","M12SD", 20.0),
             (2022, 2,2, "Jill King","K127D", 30.5),
             (2022, 3,2, "Jack Morris","12PSD",40.0)
  ]
df = spark.createDataFrame(data=strutured_data_2,schema=schema)
df.write.format("delta").mode('append').save(delta_path)

spark.sql("CREATE TABLE Customer USING DELTA LOCATION '" + delta_path + "'")

In [0]:
%sql
UPDATE Customer set total_cost=500.10 where sale_id='12PSD';

DELETE from Customer where customer='Jill King';

num_affected_rows
2


# Time Travel
* VERSION AS OF
* TIMESTAMP AS OF
* Both the data file ad the Log file are needed to do Time travel
* Older data files are deleted by 'vacuum'
  * defaullt retention is 7 days
  * Congig param: delta.deletedFileRetentionDuration
* Log files are deleted after checkpoits
  * delta.logRetentionDuration controls how long of history is kept (default is 30 days)

### Lineage

In [0]:
%sql
DESCRIBE HISTORY Customer

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2022-07-04T01:55:12.000+0000,6490153397734611,anindita.mahapatra@databricks.com,DELETE,"Map(predicate -> [""(spark_catalog.ch_6.Customer.customer = 'Jill King')""])",,List(2740698557206543),0521-192320-w5waoh4f,2.0,WriteSerializable,False,"Map(numRemovedFiles -> 2, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1006, numDeletedRows -> 2, scanTimeMs -> 543, numAddedFiles -> 0, rewriteTimeMs -> 463)",,Databricks-Runtime/10.4.x-scala2.12
2,2022-07-04T01:55:10.000+0000,6490153397734611,anindita.mahapatra@databricks.com,UPDATE,Map(predicate -> (sale_id#90343 = 12PSD)),,List(2740698557206543),0521-192320-w5waoh4f,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 2, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 970, scanTimeMs -> 115, numAddedFiles -> 2, numUpdatedRows -> 2, rewriteTimeMs -> 855)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-07-04T01:55:07.000+0000,6490153397734611,anindita.mahapatra@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(2740698557206543),0521-192320-w5waoh4f,0.0,WriteSerializable,True,"Map(numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 5411)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-07-04T01:55:05.000+0000,6490153397734611,anindita.mahapatra@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(2740698557206543),0521-192320-w5waoh4f,,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 5408)",,Databricks-Runtime/10.4.x-scala2.12


### Version & Timestamp usage

In [0]:
%sql
SELECT * FROM Customer version as of 0

year,month,day,customer,sale_id,total_cost
2021,1,2,Jack Morris,12PSD,100.0
2021,3,2,Jim Smith,M12SD,30.37
2021,6,2,Jill King,K127D,50.5


In [0]:
%sql
SELECT * FROM Customer version as of 1

year,month,day,customer,sale_id,total_cost
2021,3,2,Jim Smith,M12SD,30.37
2021,6,2,Jill King,K127D,50.5
2022,2,2,Jill King,K127D,30.5
2021,1,2,Jack Morris,12PSD,100.0
2022,1,1,Jim Smith,M12SD,20.0
2022,3,2,Jack Morris,12PSD,40.0


In [0]:
spark.read.format('delta').option('versionAsOf', '2').load('/tmp/ch-6/delta_data').show()

In [0]:
spark.read.format('delta').load('/tmp/ch-6/delta_data@v2').show()

In [0]:
ts = spark.sql("SELECT max(timestamp) FROM (DESCRIBE HISTORY Customer)").first()[0]

### Checking between versions

In [0]:
%sql
SELECT count(distinct customer) -  
 (SELECT count(distinct customer) FROM Customer VERSION AS OF 2) 
FROM Customer

(count(DISTINCT customer) - scalarsubquery())
-1


### Merge

In [0]:
delta_new_path = '/tmp/ch-6/delta_new_data'

strutured_data_3 = [(2022, 1,1, "Jim Smith","M12SD", 25.0),
             (2022, 2,2, "Jane Goodwill","P127D", 37.5)
  ]
df = spark.createDataFrame(data=strutured_data_3,schema=schema)

df.write.format("delta").mode('overwrite').save(delta_new_path)

spark.sql("CREATE TABLE New_Customer USING DELTA LOCATION '" + delta_new_path + "'")

In [0]:
%sql
MERGE INTO Customer target 
USING New_Customer VERSION AS OF 0 source 
ON source.customer = target.customer and source.year=target.year
WHEN MATCHED THEN UPDATE SET * 
WHEN NOT MATCHED THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,1,0,1


# Delta Clone
* Deep Clone to Secondary From Primary

## Deep Clone from Primary

In [0]:
%sql
CREATE OR REPLACE TABLE Secondary_Customer DEEP CLONE Customer;

source_table_size,source_num_of_files,num_removed_files,num_copied_files,removed_files_size,copied_files_size
9049,5,0,5,0,9049


## Print History of Both Tables
* Secondary table will not have all the history of the original table

In [0]:
display(spark.sql("""DESCRIBE HISTORY Customer"""))
display(spark.sql("""DESCRIBE HISTORY Secondary_Customer"""))

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2022-07-04T01:55:25.000+0000,6490153397734611,anindita.mahapatra@databricks.com,MERGE,"Map(predicate -> ((source.customer = target.customer) AND (source.year = target.year)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(2740698557206543),0521-192320-w5waoh4f,3.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, executionTimeMs -> 1840, numTargetRowsInserted -> 1, scanTimeMs -> 1039, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, rewriteTimeMs -> 699)",,Databricks-Runtime/10.4.x-scala2.12
3,2022-07-04T01:55:12.000+0000,6490153397734611,anindita.mahapatra@databricks.com,DELETE,"Map(predicate -> [""(spark_catalog.ch_6.Customer.customer = 'Jill King')""])",,List(2740698557206543),0521-192320-w5waoh4f,2.0,WriteSerializable,False,"Map(numRemovedFiles -> 2, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1006, numDeletedRows -> 2, scanTimeMs -> 543, numAddedFiles -> 0, rewriteTimeMs -> 463)",,Databricks-Runtime/10.4.x-scala2.12
2,2022-07-04T01:55:10.000+0000,6490153397734611,anindita.mahapatra@databricks.com,UPDATE,Map(predicate -> (sale_id#90343 = 12PSD)),,List(2740698557206543),0521-192320-w5waoh4f,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 2, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 970, scanTimeMs -> 115, numAddedFiles -> 2, numUpdatedRows -> 2, rewriteTimeMs -> 855)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-07-04T01:55:07.000+0000,6490153397734611,anindita.mahapatra@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(2740698557206543),0521-192320-w5waoh4f,0.0,WriteSerializable,True,"Map(numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 5411)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-07-04T01:55:05.000+0000,6490153397734611,anindita.mahapatra@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(2740698557206543),0521-192320-w5waoh4f,,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 5408)",,Databricks-Runtime/10.4.x-scala2.12


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2022-07-04T01:55:28.000+0000,6490153397734611,anindita.mahapatra@databricks.com,CLONE,"Map(source -> ch_6.customer, sourceVersion -> 4, isShallow -> false)",,List(2740698557206543),0521-192320-w5waoh4f,-1,Serializable,False,"Map(removedFilesSize -> 0, numRemovedFiles -> 0, sourceTableSize -> 9049, numCopiedFiles -> 5, copiedFilesSize -> 9049, sourceNumOfFiles -> 5)",,Databricks-Runtime/10.4.x-scala2.12


# CDC

## CDF
* change data feed can be set as TBLPROPERTIES whe creating a table or laater via ALTER TABLE
* It can also be set globally for all new tables to inherit
  * set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

In [0]:
%sql
ALTER TABLE Customer SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
INSERT INTO CUSTOMER VALUES(2022, 12,2, "Customer 1","sale_id_1", 200.10);
INSERT INTO CUSTOMER VALUES(2022, 12,5, "Customer 2","sale_id_N", 400.80);

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
SELECT * FROM table_changes('Customer', 6, 7)

year,month,day,customer,sale_id,total_cost,_change_type,_commit_version,_commit_timestamp
2022,12,5,Customer 2,sale_id_N,400.8,insert,7,2022-07-04T01:55:35.000+0000
2022,12,2,Customer 1,sale_id_1,200.1,insert,6,2022-07-04T01:55:34.000+0000


# SCD

## SCD Type-1
* The dimension table is updated, no history of the change is maintained

In [0]:
%sql
MERGE INTO Customer tgt
USING (SELECT * FROM New_Customer) as src
ON tgt.Customer = src.Customer and tgt.year=src.year and tgt.month=src.month and tgt.day=src.day
-- only sales id changes
WHEN MATCHED AND tgt.sale_id <> src.sale_id
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


## SCD Type-2
* history is maintained i.e. both thee old and new values
* 3 additional fields are included to indicate start and end date of the data chaange and a flag column to indicate if that is the current latest value

In [0]:
%sql
CREATE OR REPLACE TABLE Customer_type2 
USING DELTA
AS SELECT *, cast(null as date) record_start_dt, cast(null as date) as record_end_dt, false as current_indicator
FROM Customer
WHERE 1=0
LIMIT 1

num_affected_rows,num_inserted_rows


In [0]:
%sql
INSERT INTO Customer_type2 SELECT *,current_date(), null, true FROM Customer

num_affected_rows,num_inserted_rows
7,7


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW src_changes
AS
WITH src_bronze_table as (
  SELECT *
  FROM New_Customer
), inserts_for_matched_changes as (
  SELECT src.*
  FROM  New_Customer src
  JOIN Customer_type2 tgt
  -- only update if sale_id changes
  ON tgt.current_indicator = true and src.customer = tgt.customer and tgt.sale_id <> src.sale_id
)
SELECT *, customer as merge_key
FROM src_bronze_table
UNION ALL SELECT *, null as merge_key FROM inserts_for_matched_changes;

MERGE INTO Customer_type2 tgt
USING src_changes as src
ON tgt.customer = src.merge_key AND tgt.current_indicator = true
-- only if sale_id changes
WHEN MATCHED AND tgt.sale_id <> src.sale_id
  THEN UPDATE SET record_end_dt = current_date(), current_indicator = false
WHEN NOT MATCHED
  THEN INSERT (year, month, day, customer, sale_id, total_cost, record_start_dt, record_end_dt, current_indicator) values 
   (year, month, day, customer, sale_id, total_cost, current_date(), null, true)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
%sql
SELECT * FROM Customer_type2

year,month,day,customer,sale_id,total_cost,record_start_dt,record_end_dt,current_indicator
2021,1,2,Jack Morris,12PSD,500.1,2022-07-04,,True
2022,12,2,Customer 1,sale_id_1,200.1,2022-07-04,,True
2022,12,5,Customer 2,sale_id_N,400.8,2022-07-04,,True
2022,3,2,Jack Morris,12PSD,500.1,2022-07-04,,True
2022,2,2,Jane Goodwill,P127D,37.5,2022-07-04,,True
2022,1,1,Jim Smith,M12SD,25.0,2022-07-04,,True
2021,3,2,Jim Smith,M12SD,30.37,2022-07-04,,True
