In [1]:
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
builder = (SparkSession.builder
           .appName("merge-cdc-streaming")
           .master("spark://spark-master:7077")
           .config("spark.executor.memory", "512m")
           .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
           .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"))

spark = configure_spark_with_delta_pip(builder,['org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1']).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2bb06318-20cc-4719-973d-73a145ece7b5;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in centra

In [3]:
get_ipython().run_line_magic('load_ext', 'sparksql_magic')
get_ipython().run_line_magic('config', 'SparkSql.limit=20')

In [4]:
%%sparksql
CREATE OR REPLACE TABLE default.users (
    id INT,
    name STRING,
    age INT,
    gender STRING,
    country STRING 
) USING DELTA LOCATION '/opt/workspace/data/delta_lake/merge-cdc-streaming/users';

                                                                                

In [5]:
df = (spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092")
      .option("subscribe", "users")
      .option("startingOffsets", "earliest")
      .load())

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('gender', StringType(), True),
    StructField('country', StringType(), True)])

df = df.withColumn('value', from_json(col('value').cast("STRING"), schema))

In [7]:
from pyspark.sql.functions import col

df = df.select(
    col('value.id').alias('id'),
    col('value.name').alias('name'),
    col('value.age').alias('age'),
    col('value.gender').alias('gender'),
    col('value.country').alias('country'))

In [8]:
def upsertToDelta(microBatchDf, batchId):
    deltaTable = DeltaTable.forPath(spark, "/opt/workspace/data/delta_lake/merge-cdc-streaming/users" )
    (deltaTable.alias("dt")
     .merge(source=microBatchDf.alias("sdf"),
          condition="sdf.id = dt.id")
     .whenMatchedUpdate(set={
         "id": "sdf.id",
         "name": "sdf.name",
         "age": "sdf.gender",
         "country": "sdf.country"
     })
     .whenNotMatchedInsert(values={
         "id": "sdf.id",
         "name": "sdf.name",
         "age": "sdf.gender",
         "country": "sdf.country"
     })
    .execute())

In [9]:
query = (df.writeStream
         .format("delta")
         .foreachBatch(upsertToDelta)
         .outputMode("update")
         .option("checkpointLocation", "/opt/workspace/data/delta_lake/merge-cdc-streaming/users/_checkpoints/")
         .start("/opt/workspace/data/delta_lake/merge-cdc-streaming/users"))

                                                                                

In [10]:
%%sparksql
DESCRIBE HISTORY delta.`/opt/workspace/data/delta_lake/merge-cdc-streaming/users`;



0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2023-08-26 11:51:11.377000,,,MERGE,"{'matchedPredicates': '[{""actionType"":""update""}]', 'predicate': '[""(id#357 = id#1172)""]', 'notMatchedBySourcePredicates': '[]', 'notMatchedPredicates': '[{""actionType"":""insert""}]'}",,,,1,Serializable,False,"{'numOutputRows': '263', 'numTargetBytesAdded': '2822', 'numTargetRowsInserted': '0', 'numTargetFilesAdded': '1', 'numTargetRowsMatchedDeleted': '0', 'numTargetFilesRemoved': '1', 'numTargetRowsMatchedUpdated': '1', 'executionTimeMs': '8758', 'numTargetRowsCopied': '262', 'rewriteTimeMs': '4588', 'numTargetRowsUpdated': '1', 'numTargetRowsDeleted': '0', 'scanTimeMs': '4126', 'numSourceRows': '1', 'numTargetChangeFilesAdded': '0', 'numTargetRowsNotMatchedBySourceUpdated': '0', 'numTargetRowsNotMatchedBySourceDeleted': '0', 'numTargetBytesRemoved': '2817'}",,Apache-Spark/3.4.1 Delta-Lake/2.4.0
1,2023-08-26 11:50:56.577000,,,MERGE,"{'matchedPredicates': '[{""actionType"":""update""}]', 'predicate': '[""(id#357 = id#387)""]', 'notMatchedBySourcePredicates': '[]', 'notMatchedPredicates': '[{""actionType"":""insert""}]'}",,,,0,Serializable,False,"{'numOutputRows': '263', 'numTargetBytesAdded': '2817', 'numTargetRowsInserted': '263', 'numTargetFilesAdded': '1', 'numTargetRowsMatchedDeleted': '0', 'numTargetFilesRemoved': '0', 'numTargetRowsMatchedUpdated': '0', 'executionTimeMs': '7877', 'numTargetRowsCopied': '0', 'rewriteTimeMs': '2561', 'numTargetRowsUpdated': '0', 'numTargetRowsDeleted': '0', 'scanTimeMs': '5247', 'numSourceRows': '263', 'numTargetChangeFilesAdded': '0', 'numTargetRowsNotMatchedBySourceUpdated': '0', 'numTargetRowsNotMatchedBySourceDeleted': '0', 'numTargetBytesRemoved': '0'}",,Apache-Spark/3.4.1 Delta-Lake/2.4.0
0,2023-08-26 11:49:41.305000,,,CREATE OR REPLACE TABLE,"{'description': None, 'partitionBy': '[]', 'properties': '{}', 'isManaged': 'false'}",,,,,Serializable,True,{},,Apache-Spark/3.4.1 Delta-Lake/2.4.0


                                                                                

In [11]:
query.stop()

In [12]:
spark.stop() 