## Import libraries

In [17]:
# Standard libraries
import pandas as pd
import json
from datetime import datetime
from dateutil import parser
import warnings

# Database connection
import pyodbc  # SQL Server

# Logging
import logging
import os

## Logging to file configuration

In [18]:
# Create a folder for logs if it doesn't exist
log_dir = "code_logs"
os.makedirs(log_dir, exist_ok=True)

# Define timestamp for this ETL run
etl_run_id = datetime.now()
#print(etl_run_id)

# Log file name
log_file = os.path.join(log_dir, f"sales_etl_code_execution_logs.log")

# Configure logging
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

# Helper function to log and print together
def log_print(message, level="info"):
    if level == "info":
        logging.info(message)
        print(message)
    elif level == "error":
        logging.error(message)
        print("ERROR:", message)
        
logging.info("ETL job started.")

## SQL Server Connection

In [19]:
# -----------------------------
# SQL Server Connection
# -----------------------------
sql_conn_str = (
    "DRIVER={ODBC Driver 17 for SQL Server};"
    "SERVER=localhost;"
    "DATABASE=SalesDB;"
    "Trusted_Connection=yes;"
)

# Create a connection object
try:
    conn = pyodbc.connect(sql_conn_str)
    cursor = conn.cursor()
    logging.info("Successfully connected to SalesDB.")
    #print("Successfully connected to SalesDB.")
except Exception as e:
    logging.info(f"Error connecting to SQL Server: {e}")
    #print(f"Error connecting to SQL Server: {e}")
    raise

## Logging bad data to table

In [20]:
def log_bad_data(df_rows, etl_run_id, source_system, bad_column, issue_type, notes_prefix, conn_str):
    """
    Logs bad data rows into the BadDataLog table.

    Parameters:
    - df_rows: DataFrame containing rows to log
    - etl_run_id: timestamp of ETL run
    - source_system: e.g., 'python_etl', 'ssis_pkg'
    - bad_column: name of the column with issue, or 'ALL_COLUMNS' for full duplicates
    - issue_type: type of issue (missing, duplicate, invalid_format, etc.)
    - notes_prefix: optional prefix to notes
    - conn: existing pyodbc connection object
    """
    bad_data_list = []
    
    for idx, row in df_rows.iterrows():
        record_id = row['transaction_id'] if 'transaction_id' in row else None
        bad_data_list.append({
            'etl_run_id': etl_run_id,
            'source_system': source_system,
            'record_id': record_id,
            'bad_column': bad_column,
            'issue_type': issue_type,
            'original_record': row.to_json(),
            'notes': notes_prefix
        })
    
    if bad_data_list:
        bad_data_df = pd.DataFrame(bad_data_list)
        cursor = conn.cursor()
        for _, log_row in bad_data_df.iterrows():
            cursor.execute("""
                INSERT INTO dbo.BadDataLog
                (etl_run_id, source_system, record_id, bad_column, issue_type, original_record, notes)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """,
            log_row['etl_run_id'],
            log_row['source_system'],
            log_row['record_id'],
            log_row['bad_column'],
            log_row['issue_type'],
            log_row['original_record'],
            log_row['notes'])
        conn.commit()

## Load JSON file data into dataframe

In [21]:
try:
    input_file = 'sales_data.json'
    logging.info(f"Loading input JSON file: {input_file}")
    
    # Read JSON data
    with open(input_file, 'r') as f:
        raw_data = json.load(f)
    
    # Convert to pandas DataFrame
    sales_df = pd.json_normalize(raw_data)
    logging.info(f"Total records loaded: {len(sales_df)}")
    
    # Optional: preview first 5 rows
    #print("Total records loaded:", len(sales_df))
    display(sales_df.head(10))
    
except Exception as e:
    logging.error(f"Error loading JSON data: {e}", exc_info=True)
    raise

Unnamed: 0,transaction_id,customer_id,quantity,discount,date,region,product.id,product.name,product.category,product.price
0,T001,C001,2.0,0.05,,North,P01,Laptop,Electronics,999.99
1,T002,C002,5.0,,2023-02-10T14:15:00Z,South,P02,Mouse,Accessories,19.99
2,T003,,-1.0,0.1,03-05-2023,East,P03,Monitor,Electronics,299.5
3,T004,C004,4.0,0.15,2023-04-20,West,P04,Keyboard,Accessories,49.9
4,T005,C001,3.0,0.0,2023/05/12 12:30:00,North,P05,Desk,Furniture,189.0
5,T00006,C064,5.0,-0.05,2023-02-12,West,P03,Monitor 4K,Electronics,299.5
6,T00007,C054,0.0,0.0,2023-03-20T02:12:32Z,West,P03,Monitor 4K,Electronics,299.5
7,T00008,C034,,,2023/09/08 19:26:58,East,P02,Mouse,Accessories,19.99
8,T00009,C065,-1.0,0.1,26-03-2023,South,P01,Laptop Pro+,Electronics,999.99
9,T00010,C051,0.0,0.15,2023-05-26T00:50:45Z,Unknown,P02,Mouse,Accessories,19.99


## Data validation, cleansing, and transformation

In [22]:
# Remove exact duplicate rows
try:
    logging.info("Starting duplicate rows removal step.")
    
    duplicate_rows = sales_df[sales_df.duplicated(keep='first')]
    log_bad_data(
        df_rows=duplicate_rows,
        etl_run_id=etl_run_id,
        source_system='python_etl',
        bad_column='ALL_COLUMNS',
        issue_type='duplicate_full_row',
        notes_prefix='Dropped this full duplicate row',
        conn_str=sql_conn_str
    ) 
    
    initial_count = len(sales_df)
    sales_df = sales_df.drop_duplicates()
    removed_count = initial_count - len(sales_df)
    
    logging.info(f"Dropped {removed_count} exact duplicate rows.")
    logging.info(f"Remaining records after duplicate removal: {len(sales_df)}")
    
    #print(f"Dropped {removed_count} duplicate rows. Remaining records: {len(sales_df)}")
    
except Exception as e:
    logging.error(f"Error during duplicate removal: {e}", exc_info=True)
    raise

In [23]:
# ---- Clean 'transaction_id' column ----
try:
    logging.info("Starting 'transaction_id' cleaning step.")

    # Strip whitespace and ensure string type
    sales_df['transaction_id'] = sales_df['transaction_id'].astype(str).str.strip()
    logging.info("Transaction IDs converted to string and stripped.")

    # Identify missing or blank transaction Ids
    missing_or_blank_mask = (
    sales_df['transaction_id'].isna() |
    (sales_df['transaction_id'].str.strip() == '') |
    (sales_df['transaction_id'].str.lower().isin(['nan', 'none']))
    )
    #missing_or_blank_mask = (sales_df['transaction_id'].isna()) | (sales_df['transaction_id'] == '') | (sales_df['transaction_id'].str.lower() == 'nan')
    missing_or_blank_rows = sales_df[missing_or_blank_mask]

    if not missing_or_blank_rows.empty:
        log_bad_data(
            df_rows=missing_or_blank_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='transaction_id',
            issue_type='missing_or_blank',
            notes_prefix='Dropped row due to missing or blank transaction_id',
            conn_str=sql_conn_str
        )
        logging.info(f"Logged and dropped {len(missing_or_blank_rows)} rows with missing/blank transaction_id.")
    
    # Drop missing/blank transaction Id
    sales_df = sales_df[~missing_or_blank_mask]

    # Identify duplicate transaction Id + product Id
    duplicate_mask = sales_df.duplicated(subset=['transaction_id', 'product.id'], keep='first')
    duplicate_rows = sales_df[duplicate_mask]

    if not duplicate_rows.empty:
        log_bad_data(
            df_rows=duplicate_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='transaction_id',
            issue_type='duplicate_transactionid+productid',
            notes_prefix='Dropped duplicate transactionid+productid pairs',
            conn_str=sql_conn_str
        )
        logging.info(f"Logged and dropped {len(duplicate_rows)} duplicate transactionid + productid pairs rows.")
    
    # Drop duplicate transaction Id + product Id
    sales_df = sales_df.drop_duplicates(subset=['transaction_id', 'product.id'], keep='first')

    # Log completion
    logging.info(f"Remaining records after transaction_id cleaning: {len(sales_df)}")
    #print(f"transaction_id cleaning complete. Remaining records: {len(sales_df)}")

except Exception as e:
    logging.error(f"Error during 'transaction_id' cleaning: {e}", exc_info=True)
    print(f"❌ Error during 'transaction_id' cleaning: {e}")
    raise

In [24]:
# ---- Clean 'customer_id' column ----
try:
    logging.info("Starting 'customer_id' cleaning step.")

    # Convert to string and strip whitespace
    sales_df['customer_id'] = sales_df['customer_id'].astype(str).str.strip()
    logging.info(f"Total records after stripping whitespace: {len(sales_df)}")

    # Treat empty strings or placeholders as missing
    sales_df['customer_id'] = sales_df['customer_id'].replace(['', 'None', 'nan'], None)
    logging.info("Replaced empty/placeholder values in 'customer_id' with None.")

    # Identify missing customer_id rows before filling
    missing_customer_rows = sales_df[sales_df['customer_id'].isna()]

    if not missing_customer_rows.empty:
        log_bad_data(
            df_rows=missing_customer_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='customer_id',
            issue_type='missing',
            notes_prefix='customer_id missing or blank; replaced with "Unknown"',
            conn_str=sql_conn_str
        )
        logging.info(f"Logged {len(missing_customer_rows)} records with missing customer_id before replacement.")

    # Fill missing customer_id with placeholder 'Unknown'
    sales_df['customer_id'] = sales_df['customer_id'].fillna('Unknown')
    num_unknown = (sales_df['customer_id'] == 'Unknown').sum()
    logging.info(f"Number of records with missing customer_id replaced by Unknown: {num_unknown}")

    # Confirm total rows remain unchanged
    logging.info(f"Total records after 'customer_id' cleaning: {len(sales_df)}")
    #print(f"Number of records with missing customer_id replaced: {num_unknown}")
    #print(f"Total records after 'customer_id' cleaning: {len(sales_df)}")

except Exception as e:
    logging.error(f"Error during 'customer_id' cleaning: {e}", exc_info=True)
    raise

In [25]:
# ---- Clean 'product' columns ----
try:
    logging.info("Starting 'product' columns cleaning steps.")

    # --- Product ID ---
    sales_df['product_id'] = sales_df['product.id'].astype(str).str.strip()

    # Identify missing/placeholder product_id rows before replacement
    missing_product_id_rows = sales_df[sales_df['product_id'].isin(['', 'None', 'nan'])]
    if not missing_product_id_rows.empty:
        log_bad_data(
            df_rows=missing_product_id_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='product_id',
            issue_type='missing',
            notes_prefix="product_id missing or blank; replaced with 'Unknown'",
            conn_str=sql_conn_str
        )
        logging.info(f"Logged {len(missing_product_id_rows)} missing product_id rows.")

    sales_df['product_id'] = sales_df['product_id'].replace(['', 'None', 'nan'], 'Unknown')
    num_unknown_product = (sales_df['product_id'] == 'Unknown').sum()
    logging.info(f"Number of records with missing product_id replaced with 'Unknown': {num_unknown_product}")
    #print(f"Number of records with missing product_id replaced with 'Unknown': {num_unknown_product}")

    # --- Product Name ---
    sales_df['product_name'] = sales_df['product.name'].astype(str).str.strip()

    missing_pname_rows = sales_df[sales_df['product_name'].isin(['', 'None', 'nan'])]
    if not missing_pname_rows.empty:
        log_bad_data(
            df_rows=missing_pname_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='product_name',
            issue_type='missing',
            notes_prefix="product_name missing or blank; replaced with 'Unknown'",
            conn_str=sql_conn_str
        )
        logging.info(f"Logged {len(missing_pname_rows)} missing product_name rows.")

    sales_df['product_name'] = sales_df['product_name'].replace(['', 'None', 'nan'], 'Unknown')
    num_unknown_pname = (sales_df['product_name'] == 'Unknown').sum()
    logging.info(f"Number of records with missing/invalid product name replaced with 'Unknown': {num_unknown_pname}")
    #print(f"Number of records with missing/invalid product name replaced with 'Unknown': {num_unknown_pname}")

    # --- Category ---
    sales_df['category'] = sales_df['product.category'].astype(str).str.strip()

    missing_category_rows = sales_df[sales_df['category'].isin(['', 'None', 'nan', 'NaN'])]
    if not missing_category_rows.empty:
        log_bad_data(
            df_rows=missing_category_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='category',
            issue_type='missing',
            notes_prefix="category missing or blank; replaced with 'Unknown'",
            conn_str=sql_conn_str
        )
        logging.info(f"Logged {len(missing_category_rows)} missing category rows.")

    sales_df['category'] = sales_df['category'].replace(['', 'None', 'nan', 'NaN'], 'Unknown')
    num_unknown_category = (sales_df['category'] == 'Unknown').sum()
    logging.info(f"Number of records with missing/invalid category replaced with 'Unknown': {num_unknown_category}")
    #print(f"Number of records with missing/invalid category replaced with 'Unknown': {num_unknown_category}")

    # --- Price ---
    sales_df['price'] = pd.to_numeric(sales_df['product.price'], errors='coerce')

    # Identify missing/NaN or negative price rows
    bad_price_rows = sales_df[sales_df['price'].isna() | (sales_df['price'] < 0)]
    if not bad_price_rows.empty:
        log_bad_data(
            df_rows=bad_price_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='price',
            issue_type='invalid_value',
            notes_prefix="price missing, non-numeric, or negative; replaced with 0",
            conn_str=sql_conn_str
        )
        logging.info(f"Logged {len(bad_price_rows)} invalid/missing price rows.")

    sales_df['price'] = sales_df['price'].fillna(0)
    neg_price_count = (sales_df['price'] < 0).sum()
    sales_df.loc[sales_df['price'] < 0, 'price'] = 0
    logging.info(f"Replaced {neg_price_count} negative price values with 0")
    #print(f"Replaced {neg_price_count} negative price values with 0")
    
    logging.info(f"Total records after 'product' columns cleaning: {len(sales_df)}")
    #print(f"Total records after 'product' columns cleaning: {len(sales_df)}")

except Exception as e:
    logging.error(f"Error during product data cleaning: {e}", exc_info=True)
    raise

In [26]:
# ---- Clean 'quantity' and 'discount' columns ----
try:
    logging.info("Starting 'quantity' and 'discount' columns cleaning steps.")

    # --- Quantity ---
    sales_df['quantity'] = pd.to_numeric(sales_df['quantity'], errors='coerce')

    # Identify missing or negative quantity rows before fixing
    bad_quantity_rows = sales_df[sales_df['quantity'].isna() | (sales_df['quantity'] < 0)]
    if not bad_quantity_rows.empty:
        log_bad_data(
            df_rows=bad_quantity_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='quantity',
            issue_type='invalid_value',
            notes_prefix="quantity missing, non-numeric, or negative; replaced with 0",
            conn_str=sql_conn_str
        )
        logging.info(f"Logged {len(bad_quantity_rows)} bad quantity rows.")

    # Replace missing and negative values
    missing_quantity_count = sales_df['quantity'].isna().sum()
    negative_quantity_count = (sales_df['quantity'] < 0).sum()
    sales_df['quantity'] = sales_df['quantity'].fillna(0)
    sales_df.loc[sales_df['quantity'] < 0, 'quantity'] = 0

    # Log counts
    logging.info(f"Replaced {missing_quantity_count} missing quantity values with 0")
    logging.info(f"Replaced {negative_quantity_count} negative quantity values with 0")
    #print(f"Replaced {missing_quantity_count} missing quantity values with 0")
    #print(f"Replaced {negative_quantity_count} negative quantity values with 0")

    # --- Discount ---
    sales_df['discount'] = pd.to_numeric(sales_df['discount'], errors='coerce')

    # Identify missing or negative discount rows before fixing
    bad_discount_rows = sales_df[sales_df['discount'].isna() | (sales_df['discount'] < 0)]
    if not bad_discount_rows.empty:
        log_bad_data(
            df_rows=bad_discount_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='discount',
            issue_type='invalid_value',
            notes_prefix="discount missing, non-numeric, or negative; replaced with 0",
            conn_str=sql_conn_str
        )
        logging.info(f"Logged {len(bad_discount_rows)} bad discount rows.")

    # Replace missing and negative values
    missing_discount_count = sales_df['discount'].isna().sum()
    negative_discount_count = (sales_df['discount'] < 0).sum()
    sales_df['discount'] = sales_df['discount'].fillna(0)
    sales_df.loc[sales_df['discount'] < 0, 'discount'] = 0

    # Log counts
    logging.info(f"Replaced {missing_discount_count} missing discount values with 0")
    logging.info(f"Replaced {negative_discount_count} negative discount values with 0")
    #print(f"Replaced {missing_discount_count} missing discount values with 0")
    #print(f"Replaced {negative_discount_count} negative discount values with 0")

    logging.info(f"Total records after 'quantity & discounts' columns cleaning: {len(sales_df)}")
    #print(f"Total records after 'quantity & discounts' columns cleaning: {len(sales_df)}")

except Exception as e:
    logging.error(f"Error during quantity/discount cleaning: {e}", exc_info=True)
    raise

In [27]:
# Clean 'date' column
try:
    logging.info("Starting date column cleaning and parsing.")

    # Initial info
    null_count_before = sales_df['date'].isna().sum()
    logging.info(f"Number of null/missing values in original 'date' column: {null_count_before}")
    #print(f"Number of null/missing values in original 'date' column: {null_count_before}")

    # Identify bad date rows before parsing
    bad_date_rows = sales_df[
        sales_df['date'].isna() |
        (sales_df['date'].astype(str).str.strip() == '')
    ].copy()

    # Log bad date rows (missing/blank)
    if not bad_date_rows.empty:
        log_bad_data(
            df_rows=bad_date_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='date',
            issue_type='missing_value',
            notes_prefix="date missing or blank; replaced with 1990-01-01",
            conn_str=sql_conn_str
        )
        logging.info(f"Logged {len(bad_date_rows)} missing/blank date rows to BadDataLog.")

    # Define parsing function
    def parse_date(val):
        if pd.isna(val) or str(val).strip() == '':
            return pd.Timestamp('19900101')  # missing -> 1990
        for fmt in ('%d-%m-%Y', '%Y/%m/%d', '%d %b %Y', '%Y-%m-%d'):
            try:
                return pd.to_datetime(val, format=fmt)
            except:
                continue
        # fallback to automatic parsing
        try:
            return pd.to_datetime(val)
        except:
            return pd.Timestamp('19900101')

    # Count missing/blank before parsing
    missing_count = null_count_before + (sales_df['date'].astype(str).str.strip() == '').sum()
    logging.info(f"Number of dates to replace with 19900101 due to missing/blank: {missing_count}")
    #print(f"Number of dates to replace with 19900101 due to missing/blank: {missing_count}")

    # Parse all dates
    parsed_dates = sales_df['date'].apply(parse_date)

    # Identify rows that were replaced due to invalid parsing
    invalid_date_rows = sales_df[
        ~sales_df['date'].isna() &
        ~(sales_df['date'].astype(str).str.strip() == '') &
        (parsed_dates == pd.Timestamp('19900101'))
    ].copy()

    # Log invalid date format rows
    if not invalid_date_rows.empty:
        log_bad_data(
            df_rows=invalid_date_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='date',
            issue_type='invalid_format',
            notes_prefix="invalid date format; replaced with 19900101",
            conn_str=sql_conn_str
        )
        logging.info(f"Logged {len(invalid_date_rows)} invalid-format date rows to BadDataLog.")

    # Check if parsing introduced any nulls
    null_after_parsing = parsed_dates.isna().sum()
    logging.info(f"Number of null values after parsing: {null_after_parsing}")
    #print(f"Number of null values after parsing: {null_after_parsing}")

    # Update 'date' column with cleaned YYYY-MM-DD strings
    sales_df['date'] = parsed_dates.apply(lambda x: x.strftime('%Y%m%d') if pd.notnull(x) else '19900101')

    # sample
    #print(sales_df[['date']].head(30))

    logging.info("Date column cleaning and parsing completed successfully.")

except Exception as e:
    logging.error(f"Error during date column cleaning/parsing: {e}", exc_info=True)
    raise

In [28]:
# --- Clean 'region' column ----
try:
    logging.info("Starting 'region' data cleaning steps.")

    sales_df['region'] = sales_df['region'].astype(str).str.strip()

    # Identify missing or invalid region rows
    missing_region_rows = sales_df[sales_df['region'].isin(['', 'None', 'nan', 'NaN'])]
    if not missing_region_rows.empty:
        log_bad_data(
            df_rows=missing_region_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='region',
            issue_type='missing',
            notes_prefix="region missing or blank; replaced with 'Unknown'",
            conn_str=sql_conn_str
        )
        logging.info(f"Logged {len(missing_region_rows)} missing region rows.")

    # Replace missing or invalid regions with 'Unknown'
    sales_df['region'] = sales_df['region'].replace(['', 'None', 'nan', 'NaN'], 'Unknown')
    num_unknown_region = (sales_df['region'] == 'Unknown').sum()
    logging.info(f"Number of records with missing/invalid region replaced with 'Unknown': {num_unknown_region}")
    #print(f"Number of records with missing/invalid region replaced with 'Unknown': {num_unknown_region}")

    logging.info(f"Total records after 'region' column cleaning: {len(sales_df)}")
    #print(f"Total records after 'region' column cleaning: {len(sales_df)}")

except Exception as e:
    logging.error(f"Error during region data cleaning: {e}", exc_info=True)
    raise


## Dim Product incremental data load

In [29]:
try:
    logging.info("Starting incremental load for 'DimProduct'.")

    
    # Prepare product data from cleaned sales_df
    # ---------------------------------------
    product_df = (
        sales_df[['product_id', 'product_name', 'category', 'price']]
        .query("product_id != 'Unknown'")                 # remove Unknown records
        .drop_duplicates()                                # remove full duplicate rows
        .drop_duplicates(subset=['product_id'])           # keep one record per product_id
        .copy()
    )
    logging.info(f"Extracted {len(product_df)} unique products from source dataframe.")

    # Apply type casting to match DimProduct schema
    # ---------------------------------------
    product_df['product_id'] = product_df['product_id'].astype(str)
    product_df['product_name'] = product_df['product_name'].astype(str)
    product_df['category'] = product_df['category'].astype(str)
    product_df['price'] = product_df['price'].astype(float)

    logging.info("Applied type casting on product_df columns.")

    # Fetch existing DimProduct data
    # ---------------------------------------
    warnings.filterwarnings("ignore", message="pandas only support SQLAlchemy")
    existing_products = pd.read_sql("SELECT product_id, product_name, category, price FROM dbo.DimProduct", conn)
    logging.info(f"Fetched {len(existing_products)} existing products from DimProduct.")

    # Identify new and updated records
    # ---------------------------------------
    current_time = datetime.now()

    new_products = product_df[~product_df['product_id'].isin(existing_products['product_id'])]
    merged = product_df.merge(existing_products, on='product_id', suffixes=('', '_old'))

    changed_products = merged[
        (merged['product_name'] != merged['product_name_old']) |
        (merged['category'] != merged['category_old']) |
        (merged['price'] != merged['price_old'])
    ][['product_id', 'product_name', 'category', 'price']]

    logging.info(f"New: {len(new_products)} | Updated: {len(changed_products)}")

    # Insert new products
    # ---------------------------------------
    if not new_products.empty:
        insert_sql = """
            INSERT INTO dbo.DimProduct (product_id, product_name, category, price, created_at, updated_at)
            VALUES (?, ?, ?, ?, ?, ?)
        """
        for _, row in new_products.iterrows():
            cursor.execute(insert_sql, row['product_id'], row['product_name'],
                           row['category'], row['price'], current_time, current_time)
        logging.info(f"Inserted {len(new_products)} new products.")

    # Update existing products (if changed)
    # ---------------------------------------
    if not changed_products.empty:
        update_sql = """
            UPDATE dbo.DimProduct
            SET product_name = ?, category = ?, price = ?, updated_at = ?
            WHERE product_id = ?
        """
        for _, row in changed_products.iterrows():
            cursor.execute(update_sql, row['product_name'], row['category'],
                           row['price'], current_time, row['product_id'])
        logging.info(f"Updated {len(changed_products)} existing products.")

    # Commit
    # ---------------------------------------
    conn.commit()
    logging.info("DimProduct incremental load committed successfully.")

except Exception as e:
    logging.error(f"Error during DimProduct incremental load: {e}", exc_info=True)
    raise

## Dim Date incremental data load

In [30]:
try:
    logging.info("Starting 'DimDate' population process...")

    # Define date range
    # -----------------------------
    start_date = "2010-01-01"
    end_date = "2030-12-31"
    date_range = pd.date_range(start=start_date, end=end_date)

    # Build DimDate DataFrame
    # -----------------------------
    dim_date_df = pd.DataFrame({
        "full_date": date_range
    })

    dim_date_df["date_key"] = dim_date_df["full_date"].dt.strftime("%Y%m%d").astype(int)
    dim_date_df["year"] = dim_date_df["full_date"].dt.year
    dim_date_df["month"] = dim_date_df["full_date"].dt.month
    dim_date_df["month_name"] = dim_date_df["full_date"].dt.strftime("%B")
    dim_date_df["day"] = dim_date_df["full_date"].dt.day
    dim_date_df["quarter"] = dim_date_df["full_date"].dt.quarter

    logging.info(f"Generated DimDate DataFrame with {len(dim_date_df)} from {start_date} to {end_date}")

    # Check if DimDate already populated
    # -----------------------------
    cursor.execute("SELECT COUNT(*) FROM dbo.DimDate")
    existing_count = cursor.fetchone()[0]

    if existing_count > 1:
        logging.info(f"DimDate already contains {existing_count} records. Skipping population.")
        #print(f"DimDate already contains {existing_count} records. Skipping population.")
    else:
        logging.info("Populating DimDate table for the first time...")

        insert_sql = """
            INSERT INTO dbo.DimDate (
                date_key,
                full_date,
                year,
                month,
                month_name,
                day,
                quarter
            )
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """

        # Ensure correct order of columns
        rows = [
            (
                int(row.date_key),
                row.full_date.date(),  # convert Timestamp -> date
                int(row.year),
                int(row.month),
                str(row.month_name),
                int(row.day),
                int(row.quarter)
            )
            for row in dim_date_df.itertuples(index=False)
        ]

        # Insert in batches
        # -----------------------------
        batch_size = 1000
        for i in range(0, len(rows), batch_size):
            batch = rows[i:i + batch_size]
            cursor.executemany(insert_sql, batch)
            conn.commit()
            logging.info(f"Inserted {i + len(batch)} of {len(rows)} DimDate rows so far...")

        logging.info("DimDate population completed successfully.")
        #print("DimDate population completed successfully.")

        # Log summary stats
        min_date = dim_date_df["full_date"].min().date()
        max_date = dim_date_df["full_date"].max().date()
        logging.info(f"Inserted dates from {min_date} to {max_date}")

except Exception as e:
    logging.error(f"Error during DimDate population: {e}", exc_info=True)
    #print(f"Error during DimDate population: {e}")
    raise

## Fact Sales incremental data load

In [31]:
try:
    logging.info("Starting incremental 'FactSales' population process...")

    # Select required columns & clean duplicates
    # ----------------------------------------------------
    fact_sales_df = (
        sales_df[
            [
                "transaction_id",
                "product_id",
                "customer_id",
                "date",
                "quantity",
                "discount",
                "region"
            ]
        ]
        .drop_duplicates()   # drop exact duplicate rows
        .copy()
    )

    logging.info(f"FactSales DataFrame with {len(fact_sales_df)} rows after dropping duplicates.")

    
    # Handle composite key duplicates (transaction_id + product_id)
    # ----------------------------------------------------
    duplicate_mask = fact_sales_df.duplicated(subset=["transaction_id", "product_id"], keep="first")
    duplicate_count = duplicate_mask.sum()

    if duplicate_count > 0:
        bad_duplicate_rows = fact_sales_df[duplicate_mask].copy()
        fact_sales_df = fact_sales_df[~duplicate_mask].copy()

        logging.warning(f"Found {duplicate_count} duplicate composite key rows (transaction_id, product_id). Dropping duplicates.")

        # Log to bad data table
        log_bad_data(
            df_rows=bad_duplicate_rows,
            etl_run_id=etl_run_id,
            source_system='python_etl',
            bad_column='composite_key',
            issue_type='duplicate_key',
            notes_prefix=f"Duplicate (transaction_id, product_id) combination found; dropped {duplicate_count} records.",
            conn_str=sql_conn_str
        )

        logging.info(f"Logged {duplicate_count} duplicate composite key rows to bad data table.")
    else:
        logging.info("No duplicate composite keys found in FactSales input data.")

        
    # Apply data type casting
    # ----------------------------------------------------
    fact_sales_df["transaction_id"] = fact_sales_df["transaction_id"].astype(str)
    fact_sales_df["product_id"] = fact_sales_df["product_id"].astype(str)
    fact_sales_df["customer_id"] = fact_sales_df["customer_id"].astype(str)
    fact_sales_df["date"] = fact_sales_df["date"].astype(int)
    fact_sales_df["quantity"] = fact_sales_df["quantity"].astype(int)
    fact_sales_df["discount"] = fact_sales_df["discount"].astype(float)
    fact_sales_df["region"] = fact_sales_df["region"].astype(str)

    logging.info("Applied column type casting for FactSales DataFrame.")

    
    # Ensure customer_id exists in DimCustomer
    # ----------------------------------------------------
    logging.info("Validating customer_id values against DimCustomer...")

    dim_customers = pd.read_sql("SELECT customer_id FROM dbo.DimCustomer", conn)
    valid_customers = set(dim_customers["customer_id"].astype(str).tolist())

    # Replace missing or invalid customer_ids with 'Unknown'
    fact_sales_df["customer_id"] = fact_sales_df["customer_id"].apply(
        lambda x: x if x in valid_customers else "Unknown"
    )

    logging.info("Replaced non-existing customer_id values with 'Unknown'.")

    # Load existing keys from target FactSales table
    # ----------------------------------------------------
    logging.info("Fetching existing (transaction_id, product_id) keys from FactSales...")

    cursor.execute("SELECT transaction_id, product_id FROM dbo.FactSales;")
    existing_keys = set(tuple(row) for row in cursor.fetchall())
    logging.info(f"Fetched {len(existing_keys)} existing FactSales records.")

    # Filter new records (incremental load)
    # ----------------------------------------------------
    new_rows = fact_sales_df[
        ~fact_sales_df.apply(lambda x: (x["transaction_id"], x["product_id"]) in existing_keys, axis=1)
    ].copy()

    logging.info(f"{len(new_rows)} new FactSales records identified for insertion.")

    if new_rows.empty:
        #print("No new FactSales records to insert. Table is already up to date.")
        logging.info("No new FactSales records to insert. Table is already up to date.")
    else:
        # Insert new records in batches
        # ----------------------------------------------------
        insert_sql = """
            INSERT INTO dbo.FactSales (
                transaction_id,
                product_id,
                customer_id,
                date_key,
                quantity,
                discount,
                region
            )
            VALUES (?, ?, ?, ?, ?, ?, ?);
        """

        rows = list(new_rows.itertuples(index=False, name=None))
        batch_size = 1000

        for i in range(0, len(rows), batch_size):
            batch = rows[i:i + batch_size]
            cursor.executemany(insert_sql, batch)
            conn.commit()
            logging.info(f"Inserted {i + len(batch)} of {len(rows)} new FactSales rows so far...")

        logging.info("FactSales incremental load completed successfully.")
        #print("FactSales incremental load completed successfully.")

except Exception as e:
    logging.error(f"Error during FactSales population: {e}", exc_info=True)
    #print(f"Error during FactSales population: {e}")
    raise

In [32]:
logging.info(f"ETL process completed.")