In [0]:
# df = spark.readStream.format("delta").option("readChangeData", "true").table("incremental_load.default.raw_upi_transactions")

# display(df)

# query = df.select(
#     "transaction_id",
#     "upi_id",
#     "merchant_id",
#     "txn_amount",
#     "transaction_time",
#     "txn_status",
#     "_change_type",  # CDC change type
#     "_commit_version",
#     "_commit_timestamp"
# ).writeStream.format("console") \
#     .outputMode("append") \
#     .start()

# query.awaitTermination()

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col, sum, when

aggregated_table_name = "incremental_load.default.aggregated_upi_transactions"
raw_table_name = "incremental_load.default.raw_upi_transactions_v1"

spark.sql(f"""
          CREATE TABLE IF NOT EXISTS {aggregated_table_name} (
              merchant_id STRING,
              total_sales DOUBLE,
              total_refunds DOUBLE,
              net_sales DOUBLE
          )
          USING DELTA
          """)

def process_aggregation(batch_df, batch_id):
    print(f"Processing batch {batch_id}")

    aggregated_df = (
        batch_df.filter(col("_change_type").isin("update_postimage", "insert"))
        .groupBy("merchant_id")).agg(sum(when(col("txn_status")=="completed", col("txn_amount")).otherwise(0)).alias("total_sales"),
                                    sum(when(col("txn_status")=="refunded", -col("txn_amount")).otherwise(0)).alias("total_refunds")
        ) \
        .withColumn("net_sales", col("total_sales")+col("total_refunds"))
    
    target_table = DeltaTable.forName(spark, aggregated_table_name)
    target_table.alias("target").merge(
        aggregated_df.alias("source"),
        "target.merchant_id = source.merchant_id"
    ).whenMatchedUpdate(set={
        "total_sales": "source.total_sales + target.total_sales",
        "total_refunds": "source.total_refunds + target.total_refunds",
        "net_sales": "source.net_sales + target.net_sales"
    }).whenNotMatchedInsertAll().execute()

cdc_stream = spark.readStream.format("delta").option("readChangeFeed", "true").table(raw_table_name)
print("Read Stream Started.........")
cdc_stream.writeStream.foreachBatch(process_aggregation).outputMode("update").start().awaitTermination()
print("Write Stream Started.........")