# Change Data Feed

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

conf = SparkConf()

conf.setAppName("Sample Change Data Feed")
conf.set("spark.hadoop.fs.s3a.endpoint", "http://172.21.121.140:9000")
conf.set("spark.hadoop.fs.s3a.access.key", "chapolin")
conf.set("spark.hadoop.fs.s3a.secret.key", "mudar@123")
conf.set("spark.hadoop.fs.s3a.path.style.access", True)
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
conf.set("hive.metastore.uris", "thrift://metastore:9083")

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

## Create Dataframe

In [2]:
data2 = [("James", "Smith", "M", 3000),
         ("Michael", "Rose", "M", 6000),
         ("Robert", "Willians", "M", 5500),
         ("Maria", "Anne", "F", 7000)
        ]

schema = StructType([
    StructField("firsname", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", StringType(), True)
])

df = spark.createDataFrame(data=data2, schema=schema)

cdf_table_path = 's3a://bronze/tb_cdf'

## Send Delta table to Minio

In [3]:
df.write.format("delta").mode("overwrite").option("delta.enableChangeDataFeed", "true").save(cdf_table_path)

## Verify CDF

In [4]:
delta_table = DeltaTable.forPath(spark, cdf_table_path)
details_df = delta_table.detail()

In [6]:
properties_df = details_df.select(explode(col("properties")).alias("key", "value"))
cdf_enabled = properties_df.filter(col("key") == "delta.enableChangeDataFeed").select("value").collect()
print("CDF ok:", cdf_enabled)

CDF ok: [Row(value='true')]


In [3]:
cdf_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", 0).load(cdf_table_path)

cdf_df.show()

+--------+--------+------+------+------------+---------------+--------------------+
|firsname|lastname|gender|salary|_change_type|_commit_version|   _commit_timestamp|
+--------+--------+------+------+------------+---------------+--------------------+
|  Robert|Willians|     M|  5500|      insert|              0|2024-08-16 09:37:...|
|  Robert|Willians|     M|  5500|      insert|              1|2024-08-16 09:40:...|
| Michael|    Rose|     M|  6000|      insert|              0|2024-08-16 09:37:...|
| Michael|    Rose|     M|  6000|      insert|              1|2024-08-16 09:40:...|
|   James|   Smith|     M|  3000|      insert|              0|2024-08-16 09:37:...|
|   James|   Smith|     M|  3000|      insert|              1|2024-08-16 09:40:...|
|   Maria|    Anne|     F|  7000|      insert|              0|2024-08-16 09:37:...|
|   Maria|    Anne|     F|  7000|      insert|              1|2024-08-16 09:40:...|
|  Robert|Willians|     M|  5500|      delete|              1|2024-08-16 09:

In [4]:
update_df = spark.sql("select * from table", df=cdf_df)

AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient