In [0]:
# source_table = 'cdc_feed_project_catalog.default.raw_upi_txns'

# cdc_stream = spark.readStream.format('delta').option('readChangeFeed', 'true').table(source_table)

# query = cdc_stream.select(
#     "transaction_id",
#     "upi_id",
#     "merchant_id",
#     "transaction_amount",
#     "transaction_timestamp",
#     "transaction_status",
#     "_change_type",  # CDC change type
#     "_commit_version",
#     "_commit_timestamp"
# ).writeStream.format('console').outputMode('append').start()

# query.awaitTermination()


In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import *

aggregated_table = 'cdc_feed_project_catalog.default.merchant_table'
source_table = 'cdc_feed_project_catalog.default.raw_upi_txns_v1'

spark.sql("""
         create table if not exists cdc_feed_project_catalog.default.merchant_table (
             merchant_id string,
             total_sales decimal(10,2),
             total_refunds decimal(10,2),
             net_sales decimal(10,2)
         ) 
         using delta
          """)

def batch_processing(batch_df, batch_id):
    aggregated_df =  batch_df \
        .filter(col('_change_type').isin('insert', 'update_postimage')) \
        .groupBy('merchant_id') \
        .agg(
            sum(when(col('transaction_status') == 'completed', col('transaction_amount')).otherwise(0)).alias('total_sales'),
            sum(when(col('transaction_status') == 'refunded', -col('transaction_amount')).otherwise(0)).alias('total_refunds')
        ).withColumn('net_sales', col('total_sales') + col('total_refunds'))

    deltaTable = DeltaTable.forName(spark, aggregated_table)

    deltaTable.alias('target').merge(
        aggregated_df.alias('source'),
        'target.merchant_id = source.merchant_id'
    ).whenMatchedUpdate(
        set = {
            'total_sales': "source.total_sales + target.total_sales",
            'total_refunds': "source.total_refunds + target.total_refunds",
            'net_sales': "source.net_sales + target.net_sales"
        }
    ).whenNotMatchedInsertAll().execute()

cdc_stream = spark.readStream.format('delta').option('readChangeFeed', 'true').table(source_table)
print('Read Stream Started.................')

cdc_stream.writeStream.foreachBatch(batch_processing).outputMode('update').start().awaitTermination()
print('Write Stream Started................')
