Part 4: Change Data Capture (CDC)

In [3]:
import pandas as pd
from sqlalchemy import create_engine, text
import sqlalchemy
import logging
from datetime import datetime

# -------------------------------
# Logging setup
# -------------------------------
logging.basicConfig(
    filename="etl_cdc.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# -------------------------------
# DB connection
# -------------------------------
engine = create_engine("sqlite:///sales_dw.db", echo=False)

# -------------------------------
# Ensure history table exists
# -------------------------------
with engine.begin() as conn:
    conn.execute(text("""
    CREATE TABLE IF NOT EXISTS sales_history (
        ChangeID INTEGER PRIMARY KEY AUTOINCREMENT,
        ChangeType TEXT,
        Date DATE,
        ProductID INTEGER,
        ProductName TEXT,
        QuantitySold INTEGER,
        Price REAL,
        Category TEXT,
        CustomerID INTEGER,
        TotalSales REAL,
        ChangeTimestamp DATETIME
    )
    """))

# -------------------------------
# Load updates file
# -------------------------------
updates = pd.read_csv("../data/sales_data_updates.csv")

# Compute TotalSales for applicable rows
updates.loc[updates["ChangeType"] != "DELETE", "TotalSales"] = (
    updates["QuantitySold"] * updates["Price"]
)

# -------------------------------
# Process CDC operations
# -------------------------------
with engine.begin() as conn:
    for _, row in updates.iterrows():
        change_type = row["ChangeType"].upper()
        key = {
            "Date": row["Date"],
            "ProductID": row["ProductID"],
            "CustomerID": row["CustomerID"],
        }

        # --- INSERT ---
        if change_type == "INSERT":
            conn.execute(text("""
                INSERT INTO fact_sales (Date, ProductID, ProductName, QuantitySold, Price, Category, CustomerID, TotalSales)
                VALUES (:Date, :ProductID, :ProductName, :QuantitySold, :Price, :Category, :CustomerID, :TotalSales)
            """), row.to_dict())

        # --- UPDATE ---
        elif change_type == "UPDATE":
            # Save old record to history
            old = conn.execute(text("""
                SELECT * FROM fact_sales
                WHERE Date=:Date AND ProductID=:ProductID AND CustomerID=:CustomerID
            """), key).fetchone()

            if old:
                conn.execute(text("""
                    INSERT INTO sales_history (ChangeType, Date, ProductID, ProductName, QuantitySold, Price, Category, CustomerID, TotalSales, ChangeTimestamp)
                    VALUES ('UPDATE', :Date, :ProductID, :ProductName, :QuantitySold, :Price, :Category, :CustomerID, :TotalSales, :ts)
                """), {**dict(old), "ts": datetime.now()})

                # Apply update
                conn.execute(text("""
                    UPDATE fact_sales
                    SET ProductName=:ProductName,
                        QuantitySold=:QuantitySold,
                        Price=:Price,
                        Category=:Category,
                        TotalSales=:TotalSales
                    WHERE Date=:Date AND ProductID=:ProductID AND CustomerID=:CustomerID
                """), row.to_dict())

        # --- DELETE ---
        elif change_type == "DELETE":
            # Save old record to history
            old = conn.execute(text("""
                SELECT * FROM fact_sales
                WHERE Date=:Date AND ProductID=:ProductID AND CustomerID=:CustomerID
            """), key).fetchone()

            if old:
                conn.execute(text("""
                    INSERT INTO sales_history (ChangeType, Date, ProductID, ProductName, QuantitySold, Price, Category, CustomerID, TotalSales, ChangeTimestamp)
                    VALUES ('DELETE', :Date, :ProductID, :ProductName, :QuantitySold, :Price, :Category, :CustomerID, :TotalSales, :ts)
                """), {**dict(old), "ts": datetime.now()})

                # Delete from fact_sales
                conn.execute(text("""
                    DELETE FROM fact_sales
                    WHERE Date=:Date AND ProductID=:ProductID AND CustomerID=:CustomerID
                """), key)

        logging.info(f"Processed {change_type} for ProductID={row['ProductID']} on {row['Date']}")

print("CDC ETL complete.")


CDC ETL complete.
