In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import *
from datetime import datetime

# Configure Delta table settings
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

def get_opportunity_schema():
    """
    Define schema for Opportunity data
    """
    return StructType([
        StructField("OpportunityId", StringType(), False),
        StructField("Name", StringType(), True),
        StructField("Amount", DoubleType(), True),
        StructField("StageName", StringType(), True),
        StructField("CloseDate", StringType(), True),
        StructField("LastModifiedDate", StringType(), True),
        StructField("AccountId", StringType(), True),
        StructField("Probability", DoubleType(), True)
    ])

def cleanup_tables(source_path, target_path, changelog_path):
    """
    Clean up existing tables and directories
    """
    print("\nCleaning up existing tables and directories...")
    try:
        # Remove directories
        dbutils.fs.rm(source_path, recurse=True)
        dbutils.fs.rm(target_path, recurse=True)
        dbutils.fs.rm(changelog_path, recurse=True)
        
        # Create source directory
        dbutils.fs.mkdirs(source_path)
        print("Cleanup completed successfully")
    except Exception as e:
        print(f"Cleanup warning (safe to ignore if first run): {str(e)}")

def generate_sample_data(batch_id=1):
    """
    Generate mock Salesforce Opportunity data for streaming
    """
    print(f"\nGenerating data for batch {batch_id}")
    
    if batch_id == 1:
        # Initial data
        data = [
            ("OPP001", "Deal A", 10000.0, "Prospecting", "2023-01-01", "2023-01-01 10:00:00", "ACC001", 20.0),
            ("OPP002", "Deal B", 25000.0, "Negotiation", "2023-02-01", "2023-01-02 11:00:00", "ACC002", 60.0),
            ("OPP003", "Deal C", 15000.0, "Proposal", "2023-03-01", "2023-01-03 12:00:00", "ACC003", 40.0)
        ]
    elif batch_id == 2:
        # Updates and new record
        data = [
            ("OPP001", "Deal A", 12000.0, "Qualification", "2023-01-01", "2023-01-04 10:00:00", "ACC001", 30.0),
            ("OPP003", "Deal C", 15000.0, "Closed Won", "2023-03-01", "2023-01-05 12:00:00", "ACC003", 100.0),
            ("OPP004", "Deal D", 30000.0, "Prospecting", "2023-04-01", "2023-01-05 14:00:00", "ACC004", 20.0)
        ]
    else:
        # Final updates
        data = [
            ("OPP001", "Deal A", 15000.0, "Closed Won", "2023-01-01", "2023-01-06 10:00:00", "ACC001", 100.0),
            ("OPP004", "Deal D", 35000.0, "Negotiation", "2023-04-01", "2023-01-06 14:00:00", "ACC004", 60.0)
        ]
    
    return spark.createDataFrame(data, schema=get_opportunity_schema())

def create_initial_delta_table(table_path):
    """
    Create initial empty Delta table with schema
    """
    print("\nCreating initial Delta table structure...")
    empty_df = spark.createDataFrame([], schema=get_opportunity_schema()) \
        .withColumn("inserted_timestamp", current_timestamp()) \
        .withColumn("updated_timestamp", current_timestamp()) \
        .withColumn("is_current", lit(True))
    
    empty_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(table_path)
    print("Initial table structure created")

def write_batch_to_source(df, source_path, batch_id):
    """
    Write a batch of data to source directory
    """
    batch_path = f"{source_path}/batch_{batch_id}"
    df.write.format("json").mode("overwrite").save(batch_path)
    print(f"Written batch {batch_id} to {batch_path}")

def process_streaming_batch(df, epoch_id, target_table_path, changelog_path):
    """
    Process each streaming batch with improved changelog
    """
    batch_id = epoch_id + 1  # Adjust batch_id to start from 1
    print(f"\nProcessing batch {batch_id}")
    
    if df.isEmpty():
        print("Empty batch, skipping...")
        return
    
    # Read target Delta table
    target_delta_table = DeltaTable.forPath(spark, target_table_path)
    
    # Get existing IDs
    existing_ids = [row.OpportunityId for row in target_delta_table.toDF().select("OpportunityId").collect()]
    print(f"Existing IDs in target: {existing_ids}")
    
    # Create changelog entries
    changelog_df = df.withColumn(
        "operation_type",
        when(col("OpportunityId").isin(existing_ids), "UPDATE")
        .otherwise("INSERT")
    ).withColumn("batch_id", lit(batch_id)) \
      .withColumn("cdc_timestamp", current_timestamp()) \
      .withColumn("processed_timestamp", current_timestamp())
    
    # Perform merge operation
    merge_result = target_delta_table.alias("target").merge(
        source=df.alias("source"),
        condition="target.OpportunityId = source.OpportunityId"
    ).whenMatchedUpdate(set={
        "Amount": "source.Amount",
        "StageName": "source.StageName",
        "Probability": "source.Probability",
        "LastModifiedDate": "source.LastModifiedDate",
        "updated_timestamp": "current_timestamp()"
    }).whenNotMatchedInsert(values={
        "OpportunityId": "source.OpportunityId",
        "Name": "source.Name",
        "Amount": "source.Amount",
        "StageName": "source.StageName",
        "CloseDate": "source.CloseDate",
        "LastModifiedDate": "source.LastModifiedDate",
        "AccountId": "source.AccountId",
        "Probability": "source.Probability",
        "inserted_timestamp": "current_timestamp()",
        "updated_timestamp": "current_timestamp()",
        "is_current": "true"
    }).execute()

    # Write to changelog after successful merge
    changelog_df.select(
        "OpportunityId",
        "operation_type",
        "Amount",
        "StageName",
        "Probability",
        "LastModifiedDate",
        "batch_id",
        "processed_timestamp"
    ).write \
        .format("delta") \
        .mode("append") \
        .save(changelog_path)
    
    print(f"Completed processing batch {batch_id}")

def setup_streaming_query(source_path, target_table_path, changelog_path):
    """
    Setup streaming query with explicit schema
    """
    return spark.readStream \
        .format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .option("cloudFiles.schemaLocation", f"{source_path}/_schema") \
        .schema(get_opportunity_schema()) \
        .load(source_path) \
        .writeStream \
        .foreachBatch(lambda df, epoch_id: process_streaming_batch(df, epoch_id, target_table_path, changelog_path)) \
        .option("checkpointLocation", f"{source_path}/_checkpoint") \
        .trigger(once=True)

def display_batch_results(target_table_path, changelog_path, batch_id):
    """
    Display results after each batch with improved formatting
    """
    print(f"\n{'='*50}")
    print(f"Results After Batch {batch_id}")
    print(f"{'='*50}")
    
    # Main table status
    print("\nMain Table Contents:")
    main_df = spark.read.format("delta").load(target_table_path)
    main_df.orderBy("OpportunityId").select(
        "OpportunityId", 
        "Name", 
        "Amount", 
        "StageName", 
        "Probability",
        "LastModifiedDate"
    ).show()
    
    # Changelog for this batch
    print(f"\nChanges in Batch {batch_id}:")
    changelog_df = spark.read.format("delta").load(changelog_path)
    batch_changes = changelog_df.filter(col("batch_id") == batch_id) \
        .orderBy("OpportunityId") \
        .select(
            "OpportunityId",
            "operation_type",
            "Amount",
            "StageName",
            "Probability",
            "LastModifiedDate"
        )
    batch_changes.show()

    # Show operation counts for this batch
    print(f"\nOperation Summary for Batch {batch_id}:")
    batch_changes.groupBy("operation_type") \
        .count() \
        .orderBy("operation_type") \
        .show()

def display_final_summary(changelog_path):
    """
    Display final summary of all changes
    """
    print("\n" + "="*50)
    print("Final Change Summary")
    print("="*50)
    
    changelog_df = spark.read.format("delta").load(changelog_path)
    
    print("\nAll Changes by Batch:")
    changelog_df.groupBy("batch_id", "operation_type") \
        .count() \
        .orderBy("batch_id", "operation_type") \
        .show()
    
    print("\nTotal Changes by Operation Type:")
    changelog_df.groupBy("operation_type") \
        .count() \
        .orderBy("operation_type") \
        .show()

def main():
    """
    Main execution flow
    """
    source_path = "/tmp/opportunity_source"
    target_table_path = "/tmp/opportunity_delta"
    changelog_path = "/tmp/opportunity_changelog"
    
    try:
        # Clean up and initialize
        cleanup_tables(source_path, target_table_path, changelog_path)
        create_initial_delta_table(target_table_path)
        
        # Process each batch
        for batch_id in range(1, 4):
            print(f"\nProcessing Batch {batch_id}")
            
            # Generate and write batch data
            batch_df = generate_sample_data(batch_id)
            write_batch_to_source(batch_df, source_path, batch_id)
            
            # Process the batch
            query = setup_streaming_query(source_path, target_table_path, changelog_path).start()
            query.awaitTermination()
            
            # Display results after each batch
            display_batch_results(target_table_path, changelog_path, batch_id)
        
        # Show final summary
        display_final_summary(changelog_path)
        print("\nStreaming pipeline completed successfully")
        
    except Exception as e:
        print(f"\nError in streaming pipeline: {str(e)}")
        raise e

if __name__ == "__main__":
    main()


Cleaning up existing tables and directories...
Cleanup completed successfully

Creating initial Delta table structure...
Initial table structure created

Processing Batch 1

Generating data for batch 1
Written batch 1 to /tmp/opportunity_source/batch_1

Processing batch 1
Existing IDs in target: []
Completed processing batch 1

Results After Batch 1

Main Table Contents:
+-------------+------+-------+-----------+-----------+-------------------+
|OpportunityId|  Name| Amount|  StageName|Probability|   LastModifiedDate|
+-------------+------+-------+-----------+-----------+-------------------+
|       OPP001|Deal A|10000.0|Prospecting|       20.0|2023-01-01 10:00:00|
|       OPP002|Deal B|25000.0|Negotiation|       60.0|2023-01-02 11:00:00|
|       OPP003|Deal C|15000.0|   Proposal|       40.0|2023-01-03 12:00:00|
+-------------+------+-------+-----------+-----------+-------------------+


Changes in Batch 1:
+-------------+--------------+-------+-----------+-----------+----------------

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import *
from datetime import datetime

# Configure Delta table settings
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

def cleanup_tables(table_path, changelog_path):
    """
    Clean up existing tables before running the pipeline
    """
    print("\nCleaning up existing tables...")
    try:
        spark.sql(f"DROP TABLE IF EXISTS delta.`{table_path}`")
        spark.sql(f"DROP TABLE IF EXISTS delta.`{changelog_path}`")
        dbutils.fs.rm(table_path, recurse=True)
        dbutils.fs.rm(changelog_path, recurse=True)
    except Exception as e:
        print(f"Cleanup warning (safe to ignore if first run): {str(e)}")

def generate_sample_data(is_update=False):
    """
    Generate mock Salesforce Opportunity data
    """
    if not is_update:
        # Initial data
        data = [
            ("OPP001", "Deal A", 10000.0, "Prospecting", "2023-01-01", "2023-01-01 10:00:00", "ACC001", 20.0),
            ("OPP002", "Deal B", 25000.0, "Negotiation", "2023-02-01", "2023-01-02 11:00:00", "ACC002", 60.0),
            ("OPP003", "Deal C", 15000.0, "Proposal", "2023-03-01", "2023-01-03 12:00:00", "ACC003", 40.0)
        ]
    else:
        # Modified data for testing updates
        data = [
            ("OPP001", "Deal A", 12000.0, "Qualification", "2023-01-01", "2023-01-04 10:00:00", "ACC001", 30.0),
            ("OPP002", "Deal B", 25000.0, "Negotiation", "2023-02-01", "2023-01-02 11:00:00", "ACC002", 60.0),
            ("OPP003", "Deal C", 15000.0, "Closed Won", "2023-03-01", "2023-01-05 12:00:00", "ACC003", 100.0),
            ("OPP004", "Deal D", 30000.0, "Prospecting", "2023-04-01", "2023-01-05 14:00:00", "ACC004", 20.0)
        ]

    schema = StructType([
        StructField("OpportunityId", StringType(), False),
        StructField("Name", StringType(), True),
        StructField("Amount", DoubleType(), True),
        StructField("StageName", StringType(), True),
        StructField("CloseDate", StringType(), True),
        StructField("LastModifiedDate", StringType(), True),
        StructField("AccountId", StringType(), True),
        StructField("Probability", DoubleType(), True)
    ])
    
    return spark.createDataFrame(data, schema=schema)

def create_initial_delta_table(df, table_path):
    """
    Create initial Delta table with the source data
    """
    df_with_metadata = df.withColumn("inserted_timestamp", current_timestamp()) \
                        .withColumn("updated_timestamp", current_timestamp()) \
                        .withColumn("is_current", lit(True))
    
    df_with_metadata.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(table_path)

def process_cdc_updates(source_df, target_table_path, changelog_path):
    """
    Process CDC updates and maintain changelog
    """
    # Read existing Delta table
    target_delta_table = DeltaTable.forPath(spark, target_table_path)
    
    # Get existing IDs before merge
    existing_ids = [row.OpportunityId for row in target_delta_table.toDF().select("OpportunityId").collect()]
    
    # Prepare source data with metadata
    source_with_metadata = source_df.withColumn("cdc_timestamp", current_timestamp())
    
    # Create simple changelog entries
    changelog_df = source_with_metadata.withColumn(
        "operation_type",
        when(col("OpportunityId").isin(existing_ids), "UPDATE")
        .otherwise("INSERT")
    ).withColumn("processed_timestamp", current_timestamp())
    
    # Write to changelog
    changelog_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(changelog_path)
    
    # Perform merge operation
    merge_result = target_delta_table.alias("target").merge(
        source=source_with_metadata.alias("source"),
        condition="target.OpportunityId = source.OpportunityId"
    ).whenMatchedUpdate(set={
        "Amount": "source.Amount",
        "StageName": "source.StageName",
        "Probability": "source.Probability",
        "LastModifiedDate": "source.LastModifiedDate",
        "updated_timestamp": "current_timestamp()"
    }).whenNotMatchedInsert(values={
        "OpportunityId": "source.OpportunityId",
        "Name": "source.Name",
        "Amount": "source.Amount",
        "StageName": "source.StageName",
        "CloseDate": "source.CloseDate",
        "LastModifiedDate": "source.LastModifiedDate",
        "AccountId": "source.AccountId",
        "Probability": "source.Probability",
        "inserted_timestamp": "current_timestamp()",
        "updated_timestamp": "current_timestamp()",
        "is_current": "true"
    }).execute()

def display_pipeline_results(table_path, changelog_path):
    """
    Display CDC pipeline results in a clear, structured format
    """
    print("\n" + "="*80)
    print("CDC PIPELINE EXECUTION RESULTS")
    print("="*80)

    # 1. Initial Data
    print("\n1. INITIAL DATA IN TABLE")
    print("-"*80)
    initial_df = spark.read.format("delta").load(table_path).orderBy("OpportunityId")
    initial_df.select(
        "OpportunityId", "Name", "Amount", "StageName", "Probability", 
        "inserted_timestamp"
    ).show()

    # 2. Changes Processed
    print("\n2. CHANGES PROCESSED")
    print("-"*80)
    changelog_df = spark.read.format("delta").load(changelog_path)
    changelog_df.select(
        "OpportunityId",
        "operation_type",
        "Amount",
        "StageName",
        "Probability",
        "processed_timestamp"
    ).orderBy("OpportunityId").show()

    # 3. Final Data
    print("\n3. FINAL DATA AFTER CHANGES")
    print("-"*80)
    final_df = spark.read.format("delta").load(table_path).orderBy("OpportunityId")
    final_df.select(
        "OpportunityId", "Name", "Amount", "StageName", "Probability", 
        "updated_timestamp"
    ).show()

    # 4. Change Summary
    print("\n4. CHANGE SUMMARY")
    print("-"*80)
    print("\nOperations Performed:")
    changelog_df.groupBy("operation_type").count().show()

def main():
    """
    Main execution flow
    """
    table_path = "/tmp/opportunity_delta"
    changelog_path = "/tmp/opportunity_changelog"
    
    try:
        # Clean up existing tables
        cleanup_tables(table_path, changelog_path)
        
        # Step 1: Initial Load
        print("\nSTEP 1: Performing initial load...")
        initial_df = generate_sample_data()
        create_initial_delta_table(initial_df, table_path)
        
        # Step 2: Process Updates
        print("\nSTEP 2: Processing CDC updates...")
        updated_df = generate_sample_data(is_update=True)
        process_cdc_updates(updated_df, table_path, changelog_path)
        
        # Step 3: Display Results
        display_pipeline_results(table_path, changelog_path)
        
    except Exception as e:
        print(f"\nError in pipeline execution: {str(e)}")
        raise e

if __name__ == "__main__":
    main()


Cleaning up existing tables...

STEP 1: Performing initial load...

STEP 2: Processing CDC updates...

CDC PIPELINE EXECUTION RESULTS

1. INITIAL DATA IN TABLE
--------------------------------------------------------------------------------
+-------------+------+-------+-------------+-----------+--------------------+
|OpportunityId|  Name| Amount|    StageName|Probability|  inserted_timestamp|
+-------------+------+-------+-------------+-----------+--------------------+
|       OPP001|Deal A|12000.0|Qualification|       30.0|2025-05-05 03:37:...|
|       OPP002|Deal B|25000.0|  Negotiation|       60.0|2025-05-05 03:37:...|
|       OPP003|Deal C|15000.0|   Closed Won|      100.0|2025-05-05 03:37:...|
|       OPP004|Deal D|30000.0|  Prospecting|       20.0|2025-05-05 03:38:...|
+-------------+------+-------+-------------+-----------+--------------------+


2. CHANGES PROCESSED
--------------------------------------------------------------------------------
+-------------+--------------+

If the unity catalog configured with the hive metastore , we can directly use the schemas , such it gets pointed the storaage account 

-- Create and configure the complete hierarchy with necessary permissions
CREATE CATALOG IF NOT EXISTS salesforce_cdc;
USE CATALOG salesforce_cdc;

-- Create schemas
CREATE SCHEMA IF NOT EXISTS main;
CREATE SCHEMA IF NOT EXISTS audit;

-- Register main opportunity table with lineage and governance
CREATE TABLE main.opportunity
LOCATION '/tmp/opportunity_delta'
COMMENT 'Salesforce Opportunity CDC data'
TBLPROPERTIES (
    'delta.enableChangeDataFeed' = 'true',
    'delta.columnMapping.mode' = 'name',
    'pipeline.owner' = 'data_engineering',
    'data.source' = 'salesforce',
    'data.sensitivity' = 'confidential',
    'lineage.upstream' = 'salesforce.opportunity',
    'audit.enabled' = 'true'
);

-- Register changelog table
CREATE TABLE audit.opportunity_changelog
LOCATION '/tmp/opportunity_changelog'
COMMENT 'Change tracking log'
TBLPROPERTIES (
    'delta.enableChangeDataFeed' = 'true',
    'audit.retention.days' = '90'
);