# Final Merge Operation and Data Consolidation

**This notebook performs the final merge operation to consolidate all processed data into target table with SCD2 ( Slowly Changing Dimension) logic**

In [0]:
from delta.tables import *

#Source table
enriched_orders_table = "`event-driven-catalog`.default.enriched_orders"
customer_analytics_table = "`event-driven-catalog`.default.customer_analytics"
products_analytics_table = "`event-driven-catalog`.default.products_analytics"

#Target table

orders_target = "`event-driven-catalog`.default.orders_target"
customers_target = "`event-driven-catalog`.default.customers_target"
products_target = "`event-driven-catalog`.default.products_target"

In [0]:
# Import required libraries
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime
import json

# Read enriched data
try:
    df_enriched_orders = spark.read.table(enriched_orders_table)
    df_customer_analytics = spark.read.table(customer_analytics_table)
    df_product_analytics = spark.read.table(products_analytics_table)
    
    print("Successfully loaded enriched datasets")
    print(f"Enriched orders: {df_enriched_orders.count()} records")
    print(f"Customer analytics: {df_customer_analytics.count()} records")
    print(f"Product analytics: {df_product_analytics.count()} records")
    
except Exception as e:
    print(f"Error loading enriched datasets: {str(e)}")
    raise


Successfully loaded enriched datasets
Enriched orders: 20 records
Customer analytics: 20 records
Product analytics: 20 records


In [0]:
# Merge Orders Data with SCD2 Logic
try:
    # Prepare orders data for merge
    df_orders_merge = df_enriched_orders.select(
        "order_id", "customer_id", "product_id", "order_date", "order_amount",
        "currency", "payment_method", "shipping_address", "order_status",
        "created_timestamp", "processed_timestamp", "batch_id", "source_system",
        "order_profit_margin", "estimated_clv", "season"
    ).withColumn("effective_date", F.current_date()) \
     .withColumn("expiry_date", F.lit(None).cast(DateType())) \
     .withColumn("is_current", F.lit(True))
    
    # Check if target table exists
    if spark.catalog.tableExists(orders_target):
        # Perform SCD2 merge
        target_orders = DeltaTable.forName(spark, orders_target)
        
        # Set expiry date for existing records that will be updated
        target_orders.update(
            condition=F.col("order_id").isin([row.order_id for row in df_orders_merge.select("order_id").distinct().collect()]),
            set={
                "expiry_date": F.current_date(),
                "is_current": F.lit(False)
            }
        )
        
        # Insert new records
        df_orders_merge.write.format("delta").mode("append").saveAsTable(orders_target)
        
    else:
        # Create new table
        df_orders_merge.write.format("delta").saveAsTable(orders_target)
    
    print("Orders merge completed successfully")
    
except Exception as e:
    print(f"Error merging orders data: {str(e)}")
    raise


Orders merge completed successfully


In [0]:
# Merge Customers Data with SCD2 Logic

try:
    df_customers_merge = df_customer_analytics.select(
        "customer_id", "first_name", "last_name", "email", "phone",
        "date_of_birth", "registration_date", "address", "city", "state",
        "zip_code", "country", "customer_tier", "last_login", "customer_created_timestamp",
        "age", "age_segment", "customer_lifecycle_stage",
        "total_orders", "total_spent", "avg_order_value", "customer_segment"
    ).withColumn("effective_date", F.current_date()) \
     .withColumn("expiry_date", F.lit(None).cast(DateType())) \
     .withColumn("is_current", F.lit(True))
    
    if spark.catalog.tableExists(customers_target):
        target_customers = DeltaTable.forName(spark, customers_target)
        target_customers.update(
            condition=F.col("customer_id").isin([row.customer_id for row in df_customers_merge.select("customer_id").distinct().collect()]),
            set={
                "expiry_date": F.current_date(),
                "is_current": F.lit(False)
            }
        )
        df_customers_merge.write.format("delta").mode("append").saveAsTable(customers_target)
    else:
        df_customers_merge.write.format("delta").saveAsTable(customers_target)
    print("Customers merge completed successfully")
except Exception as e:
    print(f"Error merging customers data: {str(e)}")
    raise


Customers merge completed successfully


In [0]:
# Merge Products Data with SCD2 Logic
try:
    # Prepare products data for merge
    df_products_merge = df_product_analytics.select(
        "product_id", "product_name", "category", "subcategory", "brand",
        "price", "product_currency", "product_stock_quantity", "product_weight_kg", "dimensions_cm",
        "color", "material", "description", "launch_date", "discontinued",
        "product_created_timestamp", "product_price_segment", "product_stock_status",
        "total_orders","total_revenue", "unique_customers", "performance_category",
    ).withColumn("effective_date", F.current_date()) \
     .withColumn("expiry_date", F.lit(None).cast(DateType())) \
     .withColumn("is_current", F.lit(True))
    
    # Check if target table exists
    if spark.catalog.tableExists(products_target):
        # Perform SCD2 merge
        target_products = DeltaTable.forName(spark, products_target)
        
        # Set expiry date for existing records that will be updated
        target_products.update(
            condition=F.col("product_id").isin([row.product_id for row in df_products_merge.select("product_id").distinct().collect()]),
            set={
                "expiry_date": F.current_date(),
                "is_current": F.lit(False)
            }
        )
        
        # Insert new records
        df_products_merge.write.format("delta").mode("append").saveAsTable(products_target)
        
    else:
        # Create new table
        df_products_merge.write.format("delta").saveAsTable(products_target)
    
    print("Products merge completed successfully")
    
except Exception as e:
    print(f"Error merging products data: {str(e)}")
    raise


Products merge completed successfully
