In [0]:
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [0]:
spark = SparkSession.builder \
    .appName("delta-test") \
    .config('spark.jars.packages', 'org.apache.spark:spark-avro_2.12:3.1.2,io.delta:delta-core_2.12:1.0.1') \
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
    .getOrCreate()

In [0]:
full_load = "gs://data-lake-cdc/data/demo_hudi_delta_test/2023/01/12/04/04/8c0972982087b235becf90b60a52554e7f3774ff_mysql-backfill-fulldump_-1046201224_6_0.avro"
cdc_load = "gs://data-lake-cdc/data/demo_hudi_delta_test/2023/01/12/04/10/8c0972982087b235becf90b60a52554e7f3774ff_mysql-cdc-binlog_-1046201224_3_0.avro"

In [0]:
fullDF = spark.read.format("avro").load(full_load) \
            .select("payload.*", "*")

fullDF.show(truncate=False, vertical=True)

In [0]:
cdc_DF = spark.read.format("avro").load(cdc_load) \
            .select("payload.*", "*")

cdc_DF.show(truncate=False, vertical=True)

In [0]:
# Using Delta Lake
deltaTablePath = "gs://data-lake-cdc/data/delta_table"
fullDF.write.format("delta").mode("overwrite").save(deltaTablePath)

In [0]:
checkdf = spark.read.format("delta").load(deltaTablePath)
checkdf.show(vertical=True, truncate=False)

In [0]:
%sql
DROP TABLE IF EXISTS delta_table;

create table delta_table
using delta
location "gs://data-lake-cdc/data/delta_table";

In [0]:
%sql
SHOW TABLES;

In [0]:
%sql
show create table delta_table

In [0]:
%sql
select * from delta_table

In [0]:
cdc_DF.createOrReplaceTempView("temp")

In [0]:
%sql
select * from temp

In [0]:
%sql
MERGE INTO delta_table target
USING 
(SELECT latest_changes.pk_id, name, value, updated_at, created_at, uuid, read_timestamp, source_timestamp, object, read_method, stream_name, schema_key, sort_keys,   source_metadata, payload
  FROM temp latest_changes
 INNER JOIN (
   SELECT pk_id,  max(updated_at) AS MaxDate
   FROM temp
   GROUP BY pk_id
) cm ON latest_changes.pk_id = cm.pk_id AND latest_changes.updated_at = cm.MaxDate) as source
ON source.pk_id == target.pk_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED  THEN INSERT *

In [0]:
%sql
select * from delta_table