In [None]:
from pyspark.sql import SparkSession
from delta import *

In [None]:
# create a spark session with mssql, delta, and hive support enabled
spark = SparkSession.builder \
    .appName("sql-server-cdc-with-pyspark") \
    .config("spark.jars.packages", "com.microsoft.sqlserver:mssql-jdbc:9.4.1.jre8,io.delta:delta-core_2.12:1.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
# secrets included for readability; normally they would be in KeyVault, etc.
SRC_USER = "XXXXXX"
SRC_PWD  = "XXXXXX"
SRC_HOST = "XXXXXX"
SRC_DB   = "XXXXXX"

src_table     = "customers"
src_table_key = "customer_id"

delta_table_path = f"/tmp/{src_table}"

In [None]:
# get a list of fields from the existing object that we are interested in updating
cdc_fields = [x.name for x in spark.sql(f"select * from {src_table}").schema.fields]

spark.read \
        .format("jdbc") \
        .option("url", f"jdbc:sqlserver://{SRC_HOST}:{SRC_PORT}; database={SRC_DB}; fetchsize=20000") \
        .option("dbtable", f"cdc.dbo_{src_table}_CT") \
        .option("user", SRC_USER) \
        .option("password", SRC_PWD) \
        .option("encrypt", "true") \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load().createOrReplaceTempView(f"cdc_{src_table}")

# adjust the CDC data for the latest changes
df = spark.sql(f"""
WITH ranked_cdc_data AS (
  SELECT 
    *
    ,CAST(CASE WHEN `__$operation` = 1 THEN 1 ELSE 0 END AS BOOLEAN) as deleted 
--  ,CAST(CASE WHEN `__$operation` = 2 THEN 1 ELSE 0 END AS BOOLEAN) as inserted
--  ,CAST(CASE WHEN `__$operation` = 4 THEN 1 ELSE 0 END AS BOOLEAN) as updated
    ,ROW_NUMBER() OVER (PARTITION BY {src_table_key} ORDER BY `__$start_lsn` DESC, `__$operation` DESC) rank
  FROM 
    cdc_{src_table}
  WHERE `__$operation` != 3
),
latest_cdc_data AS (
  SELECT
    *
  FROM
    ranked_cdc_data
  WHERE rank = 1
  )
select * from latest_cdc_data
""").select(cdc_fields + ["deleted"])
 
# get the existing delta object
deltaTable = DeltaTable.forPath(spark, delta_table_path )        
# and merge in the latest changes, deleting records that have been deleted, 
# updating ones that have been changed, and inserting ones that aren't deleted
deltaTable.alias("existing") \
  .merge(df.alias("updates"), f"existing.{src_table_key} = updates.{src_table_key}") \
  .whenMatchedDelete(condition = "updates.deleted = true") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll(condition = "updates.deleted = false") \
  .execute()

In [None]:
spark.sql("select * from customers").show()