In [0]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("CustomerHarmonization").getOrCreate()

# Configuration
source_table = "processing_catalog.schema_raw_dimension.t_customers"
target_table = "processing_catalog.schema_harmonized_dimension.t_customers"
job_id = "batch_job2"  # Replace with actual job ID

# Step 1: Check if the harmonized table exists
def check_table_exists():
    query = f"""
        SELECT count(1)
        FROM processing_catalog.information_schema.tables
        WHERE table_catalog = 'processing_catalog'
        AND table_schema = 'schema_harmonized_dimension'
        AND table_name = 't_customers'
    """
    result = spark.sql(query).collect()[0][0]
    return result > 0

if check_table_exists():
    latest_batch_query = f"""
        SELECT *
        FROM {source_table}
        WHERE r_insert_timestamp > (
            SELECT COALESCE(MAX(batch_r_timestamp), '1900-01-01')
            FROM {target_table}
        )
    """
    latest_batch = spark.sql(latest_batch_query)
    latest_batch.createOrReplaceTempView("latest_batch")

    # Step 3: Identify changed records
    changed_records_query = f"""
        SELECT
            raw.customer_id,
            raw.first_name,
            raw.last_name,
            raw.email,
            raw.phone_number,
            CAST(raw.loyalty_points AS INT) AS loyalty_points,
            raw.membership_status,
            raw.address,
            raw.r_insert_timestamp AS batch_r_timestamp,
            current_date() AS current_date,
            current_timestamp() AS current_timestamp,
            '{job_id}' AS job_id
        FROM latest_batch raw
        LEFT JOIN {target_table} harmonized
        ON raw.customer_id = harmonized.customer_id
        WHERE (
            harmonized.customer_id IS NULL -- New customer
            OR raw.first_name != harmonized.first_name
            OR raw.last_name != harmonized.last_name
            OR raw.email != harmonized.email
            OR raw.phone_number != harmonized.phone_number
            OR CAST(raw.loyalty_points AS INT) != harmonized.loyalty_points
            OR raw.membership_status != harmonized.membership_status
            OR raw.address != harmonized.address
        )
        AND (harmonized.active_flag = true OR harmonized.active_flag IS NULL) -- Only check current records
    """
    changed_records = spark.sql(changed_records_query)
    changed_records.createOrReplaceTempView("changed_records")

    # Step 4: Close current records in the harmonized table
    update_existing_records_query = f"""
        MERGE INTO {target_table} AS target
        USING changed_records AS source
        ON target.customer_id = source.customer_id
        WHEN MATCHED AND target.active_flag = true THEN
        UPDATE SET
            target.active_flag = false,
            target.end_effective_date = current_date(),
            target.h_update_timestamp = current_timestamp()
    """
    spark.sql(update_existing_records_query)

    # Step 5: Insert new records into the harmonized table
    insert_new_records_query = f"""
        INSERT INTO {target_table} (
            customer_id,
            first_name,
            last_name,
            email,
            phone_number,
            loyalty_points,
            membership_status,
            address,
            active_flag,
            begin_effective_date,
            end_effective_date,
            batch_r_timestamp,
            h_insert_timestamp,
            h_update_timestamp,
            job_id
        )
        SELECT
            customer_id,
            first_name,
            last_name,
            email,
            phone_number,
            loyalty_points,
            membership_status,
            address,
            true AS active_flag,
            current_date() AS begin_effective_date,
            DATE('9999-12-31') AS end_effective_date,
            batch_r_timestamp,
            current_timestamp() AS h_insert_timestamp,
            current_timestamp() AS h_update_timestamp,
            job_id
        FROM changed_records
    """
    spark.sql(insert_new_records_query)

    print("SCD Type 2 logic successfully executed.")
else:
    print(f"Harmonized table '{target_table}' does not exist. Skipping processing.")




SCD Type 2 logic successfully executed.


True