In [1]:
from delta import configure_spark_with_delta_pip, DeltaTable
from pyspark.sql import SparkSession

spark_session = (configure_spark_with_delta_pip(SparkSession.builder.master("local[*]")
                                                        .config("spark.sql.catalogImplementation", "hive")
                                                .config("spark.sql.extensions",
                                                        "io.delta.sql.DeltaSparkSessionExtension")
                                                .config("spark.sql.catalog.spark_catalog",
                                                        "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                                                ).getOrCreate())

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/23 06:18:15 WARN Utils: Your hostname, bartosz, resolves to a loopback address: 127.0.1.1; using 192.168.1.55 instead (on interface wlp0s20f3)
25/08/23 06:18:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/bartosz/.venvs/delta_spark_4/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/bartosz/.ivy2.5.2/cache
The jars for the packages stored in: /home/bartosz/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c76d94bb-d1d5-402c-bf37-a62a848bc804;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in local-m2-cache
:: resolution report :: resolve 228ms :: artifacts dl 6ms


**Create input tables first**

In [2]:
rm -rf ./spark-warehouse && rm -rf ./metastore_db/ && rm -rf ./checkpoint

In [3]:
table = 'table_1'
print(f'Creating {table}')
spark_session.sql(f'DROP TABLE IF EXISTS `default`.`{table}`')
spark_session.sql(f'''
  CREATE TABLE `default`.`{table}` (
     number INT NOT NULL,
     letter STRING NOT NULL
  ) 
  USING DELTA 
  TBLPROPERTIES (delta.enableChangeDataFeed = true) 
''')

Creating table_1


25/08/23 06:18:25 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/08/23 06:18:25 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore bartosz@127.0.1.1
25/08/23 06:18:25 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
25/08/23 06:18:28 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`default`.`table_1` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
25/08/23 06:18:28 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
25/08/23 06:18:28 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist


DataFrame[]

# Change Data Feed

Change Data Feed (CDF) has a limited usage in the context of dual writes problem. It only works when you try to write the same dataset into a Delta Lake table and a streaming broker. Assuming that _streaming broker_ is an abstract concept that can be substituted by everything _streamable_, CDF eliminates the need of dual writes as the streaming consumers can process the Delta Lake table directly.

Let's see this in action by inserting some rows to our table with the CDF enabled:

In [4]:
from pyspark import Row

data_to_write = spark_session.createDataFrame([Row(number=1, letter='a'), Row(number=2, letter='b'), Row(number=3, letter='c')])
data_to_write.write.format('delta').mode('overwrite').insertInto(table)

spark_session.read.table(table).show()

25/08/23 06:19:10 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+------+------+
|number|letter|
+------+------+
|     2|     b|
|     1|     a|
|     3|     c|
+------+------+



Let's stream the changes now:

In [5]:
changed_rows_stream = (spark_session.readStream.format('delta')
  .option('readChangeFeed', 'true')
  .option('startingVersion', 0)
  .table(table)
  .writeStream.option('checkpointLocation', './checkpoint').format("console").trigger(availableNow=True))

changed_rows_stream.start().awaitTermination()

25/08/23 06:19:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+------------+---------------+--------------------+
|number|letter|_change_type|_commit_version|   _commit_timestamp|
+------+------+------------+---------------+--------------------+
|     1|     a|      insert|              1|2025-08-23 06:19:...|
|     2|     b|      insert|              1|2025-08-23 06:19:...|
|     3|     c|      insert|              1|2025-08-23 06:19:...|
+------+------+------------+---------------+--------------------+



Let's do some other changes in the table:

In [6]:
data_to_write = spark_session.createDataFrame([Row(number=1, letter='A'), Row(number=2, letter='B'), Row(number=4, letter='D')])
(DeltaTable.forName(spark_session, table).alias('base_table')
 .merge(data_to_write.alias('new_table'), 'base_table.number = new_table.number')
 .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute())
spark_session.read.table(table).show()

25/08/23 06:19:30 WARN MapPartitionsRDD: RDD 71 was locally checkpointed, its lineage has been truncated and cannot be recomputed after unpersisting


+------+------+
|number|letter|
+------+------+
|     1|     A|
|     2|     B|
|     4|     D|
|     3|     c|
+------+------+



If we run the stream again, we should be able to get the updated and inserted records:

In [7]:
changed_rows_stream.start().awaitTermination()

25/08/23 06:19:59 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 1
-------------------------------------------
+------+------+----------------+---------------+--------------------+
|number|letter|    _change_type|_commit_version|   _commit_timestamp|
+------+------+----------------+---------------+--------------------+
|     1|     a| update_preimage|              2|2025-08-23 06:19:...|
|     1|     A|update_postimage|              2|2025-08-23 06:19:...|
|     2|     b| update_preimage|              2|2025-08-23 06:19:...|
|     2|     B|update_postimage|              2|2025-08-23 06:19:...|
|     4|     D|          insert|              2|2025-08-23 06:19:...|
+------+------+----------------+---------------+--------------------+

