#  Data Engineering Challenge - Take Home Assignment

Having joined TechCorp as a Data Engineering Intern, the task is to create a unified data pipeline that can ETL and analyse data from three recently acquired companies. 
Data being in seperate formats with many inconsistencies.

# General approach

Phase 1: 
- Explore each dataset and find data quality issues
- Identify and understand relationships between tables
- Make the ER model
- Have a clear data cleaning strategy

Phase 2: 
- Create data cleaning functions
- Handle edge cases
- implement data validation
- Design normalised schema
- Load into SQLite DB

Phase 3: 
- Build UI using Streamlit
- Create visualizations
- Have filtering and search

Phase 4: 
- Use Gemini AI to reconcile records
- Documentation

# Initial setup: imports libraries, configures file paths, logging, Pandas display settings, and logs completion.

In [1]:
import pandas as pd
import numpy as np
import json
import sqlite3
from sqlalchemy import create_engine, text
from datetime import datetime
import re
import os
import logging
from dateutil import parser # For more robust date parsing

# --- Configuration ---
# File paths
DATA_DIR = './' # Assuming data files are in the same directory as the notebook
RECONCILIATION_DATA_CSV = os.path.join(DATA_DIR, 'data/reconciliation_challenge_data.csv')
PRODUCTS_INCONSISTENT_JSON = os.path.join(DATA_DIR, 'data/products_inconsistent_data.json')
CUSTOMERS_MESSY_JSON = os.path.join(DATA_DIR, 'data/customers_messy_data.json')
ORDERS_UNSTRUCTURED_CSV = os.path.join(DATA_DIR, 'data/orders_unstructured_data.csv')

DB_NAME = 'unified_ecommerce.db'
DB_PATH = os.path.join(DATA_DIR, DB_NAME) # Save DB in the data directory

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    handlers=[logging.StreamHandler()]) # Outputs to console; can add FileHandler

logger = logging.getLogger(__name__)

# --- Pandas Display Options ---
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.width', 1000)

logger.info("Setup and configuration complete.")

2025-06-23 08:48:41,939 - INFO - Setup and configuration complete.


# 1. Phase 1: Data Discovery & Analysis (Jupyter Notebook)
1.1 Load Datasets 

# Customer Data (customers_messy_data.json)

In [2]:
logger.info("Loading Customer data...")
try:
    with open(CUSTOMERS_MESSY_JSON, 'r') as f:
        raw_customers_data = json.load(f)
    df_customers_raw = pd.DataFrame(raw_customers_data)
    logger.info("Customer data loaded successfully.")
except FileNotFoundError:
    logger.error(f"Customer data file not found at {CUSTOMERS_MESSY_JSON}")
    df_customers_raw = pd.DataFrame() # Create empty DataFrame to avoid downstream errors
except json.JSONDecodeError:
    logger.error(f"Error decoding JSON from {CUSTOMERS_MESSY_JSON}")
    df_customers_raw = pd.DataFrame()

2025-06-23 08:48:41,945 - INFO - Loading Customer data...
2025-06-23 08:48:41,956 - INFO - Customer data loaded successfully.


# Product Data (products_inconsistent_data.json)

In [3]:
logger.info("Loading Product data...")
try:
    with open(PRODUCTS_INCONSISTENT_JSON, 'r') as f:
        raw_products_data = json.load(f)
    df_products_raw = pd.DataFrame(raw_products_data)
    logger.info("Product data loaded successfully.")
except FileNotFoundError:
    logger.error(f"Product data file not found at {PRODUCTS_INCONSISTENT_JSON}")
    df_products_raw = pd.DataFrame()
except json.JSONDecodeError:
    logger.error(f"Error decoding JSON from {PRODUCTS_INCONSISTENT_JSON}")
    df_products_raw = pd.DataFrame()

2025-06-23 08:48:41,965 - INFO - Loading Product data...
2025-06-23 08:48:41,975 - INFO - Product data loaded successfully.


# Reconciliation Data (reconciliation_challenge_data.csv)

In [4]:
logger.info("Loading Reconciliation (Order Items Source 1) data...")
try:
    df_reconciliation_raw = pd.read_csv(RECONCILIATION_DATA_CSV)
    logger.info("Reconciliation data loaded successfully.")
except FileNotFoundError:
    logger.error(f"Reconciliation data file not found at {RECONCILIATION_DATA_CSV}")
    df_reconciliation_raw = pd.DataFrame()

2025-06-23 08:48:41,986 - INFO - Loading Reconciliation (Order Items Source 1) data...
2025-06-23 08:48:41,996 - INFO - Reconciliation data loaded successfully.


# Orders Unstructured Data

In [5]:
logger.info("Loading Orders Unstructured (Order Items Source 2) data...")
try:
    df_orders_unstructured_raw = pd.read_csv(ORDERS_UNSTRUCTURED_CSV)
    logger.info("Orders Unstructured data loaded successfully.")
except FileNotFoundError:
    logger.error(f"Orders Unstructured data file not found at {ORDERS_UNSTRUCTURED_CSV}")
    df_orders_unstructured_raw = pd.DataFrame()

2025-06-23 08:48:42,007 - INFO - Loading Orders Unstructured (Order Items Source 2) data...
2025-06-23 08:48:42,021 - INFO - Orders Unstructured data loaded successfully.


# 1.2 Initial Exploration 

The initial exploration summarizes the structure, missing values, duplicates, and data types of the raw customer dataset. It also analyzes key columns with value counts and descriptive statistics to assess data quality and consistency.

# Exploration for df_customers_raw (Customer Data)

In [6]:
logger.info("\n--- Exploring Raw Customer Data (customers_messy_data.json) ---")
if not df_customers_raw.empty:
    print("Shape:", df_customers_raw.shape)
    
    print("\nInfo:")
    df_customers_raw.info()
    
    print("\nFirst 5 rows:")
    print(df_customers_raw.head())
    
    print("\nLast 5 rows:")
    print(df_customers_raw.tail())
    
    print("\nData Types:")
    print(df_customers_raw.dtypes)
    
    print("\nMissing values (count and percentage):")
    missing_values = df_customers_raw.isnull().sum()
    missing_percentage = (df_customers_raw.isnull().sum() / len(df_customers_raw)) * 100
    missing_df = pd.DataFrame({'count': missing_values, 'percentage': missing_percentage})
    print(missing_df[missing_df['count'] > 0])
    
    print("\nDuplicate rows (entire row):", df_customers_raw.duplicated().sum())
    
    print("\nDescriptive statistics (numerical columns):")
    print(df_customers_raw.describe(include=[np.number]))
    
    print("\nDescriptive statistics (object/categorical columns):")
    print(df_customers_raw.describe(include=['object']))
    
    logger.info("Value counts for key categorical/ID columns in Customers:")
    key_customer_cols = ['cust_id', 'customer_name', 'full_name', 'email', 'email_address', 
                         'city', 'state', 'status', 'customer_status', 'gender', 'segment', 'preferred_payment']
    for col in key_customer_cols:
        if col in df_customers_raw.columns:
            print(f"\nValue counts for '{col}':")
            print(df_customers_raw[col].value_counts(dropna=False).head(10)) # Show top 10 + NaN
            print(f"Number of unique values in '{col}': {df_customers_raw[col].nunique(dropna=False)}")
        else:
            logger.warning(f"Column '{col}' not found in raw customer data.")
    
    logger.info("Customer data exploration complete.")
else:
    logger.warning("Customer DataFrame (df_customers_raw) is empty. Skipping exploration.")

2025-06-23 08:48:42,036 - INFO - 
--- Exploring Raw Customer Data (customers_messy_data.json) ---
2025-06-23 08:48:42,126 - INFO - Value counts for key categorical/ID columns in Customers:
2025-06-23 08:48:42,139 - INFO - Customer data exploration complete.


Shape: (500, 25)

Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500 entries, 0 to 499
Data columns (total 25 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   customer_id        500 non-null    object 
 1   cust_id            500 non-null    object 
 2   customer_name      500 non-null    object 
 3   full_name          500 non-null    object 
 4   email              409 non-null    object 
 5   email_address      500 non-null    object 
 6   phone              187 non-null    object 
 7   phone_number       500 non-null    object 
 8   address            500 non-null    object 
 9   city               500 non-null    object 
 10  state              500 non-null    object 
 11  zip_code           361 non-null    object 
 12  postal_code        500 non-null    object 
 13  registration_date  412 non-null    object 
 14  reg_date           500 non-null    object 
 15  status             443 non-null    object 
 16  cu

# Exploration for df_products_raw (Product Data)

In [7]:
logger.info("\n--- Exploring Raw Product Data (products_inconsistent_data.json) ---")
if not df_products_raw.empty:
    print("Shape:", df_products_raw.shape)
    
    print("\nInfo:")
    df_products_raw.info()
    
    print("\nFirst 5 rows:")
    print(df_products_raw.head())

    print("\nLast 5 rows:")
    print(df_products_raw.tail())
        
    print("\nData Types:")
    print(df_products_raw.dtypes)
    
    print("\nMissing values (count and percentage):")
    missing_values = df_products_raw.isnull().sum()
    missing_percentage = (df_products_raw.isnull().sum() / len(df_products_raw)) * 100
    missing_df = pd.DataFrame({'count': missing_values, 'percentage': missing_percentage})
    print(missing_df[missing_df['count'] > 0])
    
    print("\nDuplicate rows (entire row):", df_products_raw.duplicated().sum())
    
    print("\nDescriptive statistics (numerical columns - attempt conversion for relevant ones):")
    # Attempt to convert price-like columns for describe, but be cautious with errors
    numeric_potential_cols = ['price', 'list_price', 'cost', 'weight', 'stock_quantity', 'stock_level', 'reorder_level', 'rating']
    df_products_numeric_describe = df_products_raw.copy()
    for col in numeric_potential_cols:
        if col in df_products_numeric_describe.columns:
            df_products_numeric_describe[col] = pd.to_numeric(df_products_numeric_describe[col], errors='coerce')
    print(df_products_numeric_describe[numeric_potential_cols].describe())
    
    print("\nDescriptive statistics (object/categorical columns):")
    print(df_products_raw.describe(include=['object']))

    logger.info("Value counts for key categorical/ID columns in Products:")
    key_product_cols = ['product_id', 'item_id', 'product_name', 'item_name', 'category', 
                        'product_category', 'brand', 'manufacturer', 'supplier_id', 'is_active', 'color', 'size']
    for col in key_product_cols:
        if col in df_products_raw.columns:
            print(f"\nValue counts for '{col}':")
            print(df_products_raw[col].value_counts(dropna=False).head(10))
            print(f"Number of unique values in '{col}': {df_products_raw[col].nunique(dropna=False)}")
        else:
            logger.warning(f"Column '{col}' not found in raw product data.")
            
    logger.info("Product data exploration complete.")
else:
    logger.warning("Product DataFrame (df_products_raw) is empty. Skipping exploration.")

2025-06-23 08:48:42,151 - INFO - 
--- Exploring Raw Product Data (products_inconsistent_data.json) ---
2025-06-23 08:48:42,231 - INFO - Value counts for key categorical/ID columns in Products:
2025-06-23 08:48:42,241 - INFO - Product data exploration complete.


Shape: (200, 24)

Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200 entries, 0 to 199
Data columns (total 24 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   product_id        200 non-null    object
 1   item_id           200 non-null    int64 
 2   product_name      200 non-null    object
 3   item_name         200 non-null    object
 4   description       145 non-null    object
 5   category          200 non-null    object
 6   product_category  200 non-null    object
 7   brand             171 non-null    object
 8   manufacturer      173 non-null    object
 9   price             200 non-null    object
 10  list_price        200 non-null    object
 11  cost              200 non-null    object
 12  weight            123 non-null    object
 13  dimensions        200 non-null    object
 14  color             170 non-null    object
 15  size              170 non-null    object
 16  stock_quantity    200 non-null    int6

# Exploration for df_reconciliation_raw

In [8]:
logger.info("\n--- Exploring Raw Reconciliation Data (reconciliation_challenge_data.csv) ---")
if not df_reconciliation_raw.empty:
    print("Shape:", df_reconciliation_raw.shape)
    
    print("\nInfo:")
    df_reconciliation_raw.info()
    
    print("\nFirst 5 rows:")
    print(df_reconciliation_raw.head())

    print("\nLast 5 rows:")
    print(df_reconciliation_raw.tail())
        
    print("\nData Types:")
    print(df_reconciliation_raw.dtypes)
    
    print("\nMissing values (count and percentage):")
    missing_values = df_reconciliation_raw.isnull().sum()
    missing_percentage = (df_reconciliation_raw.isnull().sum() / len(df_reconciliation_raw)) * 100
    missing_df = pd.DataFrame({'count': missing_values, 'percentage': missing_percentage})
    print(missing_df[missing_df['count'] > 0])
    
    print("\nDuplicate rows (entire row):", df_reconciliation_raw.duplicated().sum())
    
    print("\nDescriptive statistics (numerical columns):")
    # Select columns that are expected to be numeric
    numeric_cols_recon = ['amount_paid', 'quantity_ordered', 'unit_cost', 'total_value', 
                          'discount_applied', 'shipping_fee', 'tax_amount']
    print(df_reconciliation_raw[numeric_cols_recon].describe())
    
    print("\nDescriptive statistics (object/categorical columns):")
    print(df_reconciliation_raw.describe(include=['object']))

    logger.info("Value counts for key categorical/ID columns in Reconciliation Data:")
    key_recon_cols = ['client_reference', 'transaction_ref', 'item_reference', 'payment_status', 
                      'delivery_status', 'customer_segment', 'region', 'product_line']
    for col in key_recon_cols:
        if col in df_reconciliation_raw.columns:
            print(f"\nValue counts for '{col}':")
            print(df_reconciliation_raw[col].value_counts(dropna=False).head(10))
            print(f"Number of unique values in '{col}': {df_reconciliation_raw[col].nunique(dropna=False)}")
        else:
            logger.warning(f"Column '{col}' not found in raw reconciliation data.")
            
    # Check financial integrity: total_value vs (quantity * unit_cost)
    df_reconciliation_raw['calculated_total'] = df_reconciliation_raw['quantity_ordered'] * df_reconciliation_raw['unit_cost']
    discrepancies = df_reconciliation_raw[
        ~np.isclose(df_reconciliation_raw['total_value'].fillna(0), df_reconciliation_raw['calculated_total'].fillna(0))
    ]
    print(f"\nNumber of rows where total_value != quantity * unit_cost: {len(discrepancies)}")
    if not discrepancies.empty:
        print("Examples of discrepancies:")
        print(discrepancies[['quantity_ordered', 'unit_cost', 'total_value', 'calculated_total']].head())
    df_reconciliation_raw.drop(columns=['calculated_total'], inplace=True, errors='ignore')

    logger.info("Reconciliation data exploration complete.")
else:
    logger.warning("Reconciliation DataFrame (df_reconciliation_raw) is empty. Skipping exploration.")

2025-06-23 08:48:42,253 - INFO - 
--- Exploring Raw Reconciliation Data (reconciliation_challenge_data.csv) ---
2025-06-23 08:48:42,328 - INFO - Value counts for key categorical/ID columns in Reconciliation Data:
2025-06-23 08:48:42,342 - INFO - Reconciliation data exploration complete.


Shape: (300, 20)

Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 300 entries, 0 to 299
Data columns (total 20 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   client_reference         300 non-null    object 
 1   full_customer_name       300 non-null    object 
 2   contact_email            300 non-null    object 
 3   transaction_ref          300 non-null    object 
 4   item_reference           300 non-null    object 
 5   transaction_date         300 non-null    object 
 6   amount_paid              300 non-null    float64
 7   payment_status           300 non-null    object 
 8   delivery_status          300 non-null    object 
 9   customer_segment         300 non-null    object 
 10  region                   300 non-null    object 
 11  product_line             300 non-null    object 
 12  quantity_ordered         300 non-null    int64  
 13  unit_cost                300 non-null    float64
 14  to

# Exploration for df_orders_unstructured_raw

In [9]:
logger.info("\n--- Exploring Raw Orders Unstructured Data (orders_unstructured_data.csv) ---")
if not df_orders_unstructured_raw.empty:
    print("Shape:", df_orders_unstructured_raw.shape)
    
    print("\nInfo:")
    df_orders_unstructured_raw.info()
    
    print("\nFirst 5 rows:")
    print(df_orders_unstructured_raw.head())

    print("\nLast 5 rows:")
    print(df_orders_unstructured_raw.tail())
        
    print("\nData Types:")
    print(df_orders_unstructured_raw.dtypes)
    
    print("\nMissing values (count and percentage):")
    missing_values = df_orders_unstructured_raw.isnull().sum()
    missing_percentage = (df_orders_unstructured_raw.isnull().sum() / len(df_orders_unstructured_raw)) * 100
    missing_df = pd.DataFrame({'count': missing_values, 'percentage': missing_percentage})
    print(missing_df[missing_df['count'] > 0])
    
    print("\nDuplicate rows (entire row):", df_orders_unstructured_raw.duplicated().sum())
    
    print("\nDescriptive statistics (numerical columns - attempt conversion):")
    numeric_potential_cols_orders = ['quantity', 'qty', 'unit_price', 'price', 'total_amount', 
                                     'order_total', 'shipping_cost', 'tax', 'discount']
    df_orders_numeric_describe = df_orders_unstructured_raw.copy()
    for col in numeric_potential_cols_orders:
        if col in df_orders_numeric_describe.columns:
            df_orders_numeric_describe[col] = pd.to_numeric(df_orders_numeric_describe[col], errors='coerce')
    print(df_orders_numeric_describe[numeric_potential_cols_orders].describe())
    
    print("\nDescriptive statistics (object/categorical columns):")
    print(df_orders_unstructured_raw.describe(include=['object']))

    logger.info("Value counts for key categorical/ID columns in Orders Unstructured Data:")
    key_orders_cols = ['order_id', 'ord_id', 'customer_id', 'cust_id', 'product_id', 'item_id',
                       'status', 'order_status', 'payment_method']
    for col in key_orders_cols:
        if col in df_orders_unstructured_raw.columns:
            print(f"\nValue counts for '{col}':")
            print(df_orders_unstructured_raw[col].value_counts(dropna=False).head(10))
            print(f"Number of unique values in '{col}': {df_orders_unstructured_raw[col].nunique(dropna=False)}")
        else:
            logger.warning(f"Column '{col}' not found in raw orders unstructured data.")
            
    logger.info("Orders Unstructured data exploration complete.")
else:
    logger.warning("Orders Unstructured DataFrame (df_orders_unstructured_raw) is empty. Skipping exploration.")

2025-06-23 08:48:42,351 - INFO - 
--- Exploring Raw Orders Unstructured Data (orders_unstructured_data.csv) ---


Shape: (1000, 23)

Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 23 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   order_id          1000 non-null   object 
 1   ord_id            1000 non-null   int64  
 2   customer_id       1000 non-null   int64  
 3   cust_id           1000 non-null   object 
 4   order_date        914 non-null    object 
 5   order_datetime    795 non-null    object 
 6   product_id        1000 non-null   object 
 7   item_id           1000 non-null   int64  
 8   quantity          1000 non-null   int64  
 9   qty               1000 non-null   int64  
 10  unit_price        1000 non-null   float64
 11  price             1000 non-null   float64
 12  total_amount      1000 non-null   float64
 13  order_total       1000 non-null   float64
 14  shipping_cost     1000 non-null   float64
 15  tax               1000 non-null   float64
 16  discount          

2025-06-23 08:48:42,460 - INFO - Value counts for key categorical/ID columns in Orders Unstructured Data:
2025-06-23 08:48:42,470 - INFO - Orders Unstructured data exploration complete.


      order_id  ord_id  customer_id    cust_id  order_date            order_datetime product_id  item_id  quantity  qty  unit_price   price  total_amount  order_total  shipping_cost    tax  discount      status order_status payment_method shipping_address                          notes tracking_number
995  ORD_00996     996          139  CUST_0139  2023-10-04  2023-01-21T21:18:00.000Z   PROD_198      155         5    4      108.40   43.36        705.49       535.21           0.00  26.67      0.00    returned          NaN  bank_transfer     7790 Main St                            NaN       TRK520098
996  ORD_00997     997          233  CUST_0233  2023-11-06  2023-07-04T06:17:00.000Z   PROD_173      146         5    5       58.79  223.80        990.20       822.90          23.90  29.42      0.00   delivered      pending    credit_card     6580 Oak Ave                            NaN             NaN
997  ORD_00998     998          241  CUST_0241    7/9/2023  2023-03-26T03:58:00.000Z   PROD

# Detailed Data Quality Issues & Relationship Analysis

# 1. customers_messy_data.json (Customer Data)

# Simple Summary of Customer Data Issues (customers_messy_data.json):

- Main ID: Use cust_id as the main customer ID – it's clean and consistent.

- Names: Some are proper names, others are emails or usernames – pick the best and format nicely.

- Emails: Two email fields – combine them, keep valid ones, make lowercase.

- Phone Numbers: Messy formats – merge and clean them up to one standard format.

- Addresses: Clean up street, city, state, and zip codes; make them consistent.

- Dates: Dates are in different formats – combine and convert to a standard one.

- Status Info: Two fields for status – merge and standardize (like ACTIVE or INACTIVE).

- Spending & Age: Fix number formats and make sure age is a proper whole number.

- Other Info: Standardize things like gender, payment method, and customer segment.

- Duplicates: No exact copy-paste duplicates, but similar entries might exist – check after cleaning.

- Linking: cust_id will be used to connect with orders.

# 2. products_inconsistent_data.json (Product Data)

# Simple Summary of Product Data Issues (products_inconsistent_data.json):

- Main ID: Use product_id (like PROD_001) as the main product ID. Keep item_id for reference.

- Names: Two name fields say the same thing – keep product_name.

- Descriptions: 55 are missing – either leave blank or use a placeholder.

- Categories: Messy casing (e.g., "Electronics", "electronics") – merge and standardize.

- Brands: Two brand fields, both messy and incomplete – combine, clean up, and fill missing with 'UNKNOWN'.

- Numbers: Prices, weights, and ratings are stored as text – convert to numbers and handle errors.

- Dimensions: Stored like "10x5x2" – split into length, width, and height. Handle blanks safely.

- Color & Size: Many missing or empty – clean up and fill with 'Unknown' or 'N/A'.

- Stock Info: Fields look fine – keep just one field like stock_quantity.

- Supplier ID: Missing in many – clean up and fill missing with 'UNKNOWN'.

- Dates: Some are missing or in different formats – parse to standard date/time format.

- Active Status: Mixed values like "yes", True, 0 – convert to proper True/False.

- Duplicates: No exact duplicates found.

- Relationships: Use product_id to link with orders. item_id might also be needed for certain datasets.

# 3. orders_unstructured_data.csv

# Simple Summary of Order Data Issues (orders_unstructured_data.csv):

- IDs: Two versions for order, customer, and product IDs – need to pick one clean version for each (use order_id, cust_id, product_id).

- Dates: Two date fields – combine them, prefer the full timestamp (order_datetime) when available.

- Quantity & Price: Duplicate fields (quantity vs qty, unit_price vs price) – merge each pair into one.

- Total Amounts: total_amount likely reflects line totals; order_total might be full order value – treat them accordingly.

- Shipping/Tax/Discount: These repeat across line items but belong to the overall order – handle carefully when building summary tables.

- Status: Two status fields – merge and clean to values like ‘DELIVERED’, ‘CANCELLED’, etc.

- Address: One field with full shipping address – can keep as-is or later break into parts.

- Notes & Tracking: Mostly missing – clean and keep if useful.

- Duplicates: None exactly the same, but cleanup is still needed.

- Relationships:

Cleaned order_id will group line items.

Cleaned cust_id links to the Customers table.

Cleaned product_id links to the Products table.

# 4. reconciliation_challenge_data.csv

# Simple Summary of Reconciliation Data Issues (reconciliation_challenge_data.csv):

- IDs:

client_reference, transaction_ref, and item_reference need to be mapped to cleaned customer, order, and product IDs.

- Customer Info:

Fields like full_customer_name and contact_email should come from the main Customers table, not be stored here.

- Dates:

Convert transaction_date to standard format (YYYY-MM-DD).

last_modified_timestamp is already in good format.

- Status Fields:

Values for payment_status and delivery_status look consistent — just standardize casing (UPPERCASE).

- Categories:

Fields like customer_segment, region, and product_line are fine but may need consistent casing.

- Numbers:

Most numeric fields are valid, but total_value doesn’t match quantity_ordered * unit_cost — trust calculated value over given one.

Use amount_paid as the actual amount spent (assumed per line item unless confirmed otherwise).

- Notes:

Mostly missing — clean and keep if useful. Combine if multiple lines per transaction.

- Duplicates:

No exact duplicates, but some transactions have multiple items.

- Relationships:

transaction_ref = order ID

client_reference links to Customers

item_reference links to Products











# Phase 2: ETL Pipeline Development
2.1 Utility/Helper Functions 

# Key Edge Cases
- Missing or Conflicting IDs: Generated fallback canonical IDs for missing customer/product identifiers; ensured referential integrity by dropping unmappable foreign keys with proper logging.

- Inconsistent Formats: Handled messy text fields (e.g., names, addresses, booleans), unparseable dates, and non-numeric price fields using defensive parsing and cleaning functions.

- Duplicate & Redundant Data: Applied coalescing strategies for overlapping fields (e.g., email vs. email_address), and de-duplicated on canonical IDs with prioritization logic.

- Financial Inconsistencies: Normalized line_item_total_value using quantity × unit price; retained original total_value_provided for auditing.

- Empty or Invalid Records: ETL functions safely exit or skip processing when input data is empty or critical fields are missing.

- Ambiguous Mappings: Implemented layered ID resolution logic (e.g., item_id vs product_id) to ensure valid lookups.



In [10]:
DEFAULT_UNKNOWN_CATEGORICAL = 'UNKNOWN'
DEFAULT_UNKNOWN_NUMERIC_INT = 0
DEFAULT_UNKNOWN_NUMERIC_FLOAT = 0.0
DEFAULT_STATUS_UNKNOWN = 'UNKNOWN'

# Gender Standardization
GENDER_MAP = {
    'M': 'MALE', 'F': 'FEMALE', 'MALE': 'MALE', 'FEMALE': 'FEMALE', 
    'OTHER': 'OTHER', '': DEFAULT_UNKNOWN_CATEGORICAL # Handle empty string
}
# Customer Status Standardization
CUSTOMER_STATUS_MAP = {
    'ACTIVE': 'ACTIVE', 'INACTIVE': 'INACTIVE', 'PENDING': 'PENDING',
    'SUSPENDED': 'SUSPENDED', '': DEFAULT_STATUS_UNKNOWN # Handle empty string
}
# Payment Status Standardization (Example - expand based on all sources)
PAYMENT_STATUS_MAP = {
    'COMPLETED': 'COMPLETED', 'PENDING': 'PENDING', 'FAILED': 'FAILED',
    '': DEFAULT_STATUS_UNKNOWN
}
# Delivery/Order Status Standardization (Example - expand based on all sources)
ORDER_DELIVERY_STATUS_MAP = {
    'DELIVERED': 'DELIVERED', 'PENDING': 'PENDING', 'IN_TRANSIT': 'IN_TRANSIT',
    'PROCESSING': 'PROCESSING', 'SHIPPED': 'SHIPPED', 'CANCELLED': 'CANCELLED',
    'RETURNED': 'RETURNED', '': DEFAULT_STATUS_UNKNOWN
}
# State Standardization
STATE_ABBREVIATION_MAP = {
    'California': 'CA', 'New York': 'NY', 'Illinois': 'IL', 'Texas': 'TX',
    'Pennsylvania': 'PA', 'Arizona': 'AZ',
    'CA': 'CA', 'NY': 'NY', 'IL': 'IL', 'TX': 'TX', 'PA': 'PA', 'AZ': 'AZ'
    # Add all US states and common variations
}
# City Standardization (Example - this would be much larger in reality or use a geo-library)
CITY_NORMALIZATION_MAP = {
    'La': 'Los Angeles', 'Losangeles': 'Los Angeles',
    'Nyc': 'New York', 'New York City': 'New York', 'New_York': 'New York',
    'Phila': 'Philadelphia',
    'New_york': 'New York', # From exploration
}

def clean_string(value, case=None, default_if_empty=None):
    """Cleans a string: strips whitespace, optionally converts case, returns default if empty."""
    if pd.isna(value):
        return default_if_empty
    value_str = str(value).strip()
    if not value_str and default_if_empty is not None: # if after stripping it's empty
        return default_if_empty
    if not value_str: # if still empty and no default_if_empty
        return None

    if case == 'lower':
        return value_str.lower()
    elif case == 'upper':
        return value_str.upper()
    elif case == 'title':
        return value_str.title()
    return value_str

def standardize_categorical(value, mapping_dict, default_value=DEFAULT_UNKNOWN_CATEGORICAL, case_transform='upper'):
    """Standardizes categorical values using a mapping dictionary."""
    cleaned_value = clean_string(value, case=case_transform)
    if cleaned_value is None:
        return default_value
    return mapping_dict.get(cleaned_value, default_value)

def parse_date_robustly(date_val, output_format='%Y-%m-%d', error_val=None):
    """Parses various date/datetime formats and returns a string in specified format or error_val."""
    if pd.isna(date_val) or str(date_val).strip() == '':
        return error_val
    try:
        dt_obj = parser.parse(str(date_val))
        return dt_obj.strftime(output_format)
    except (ValueError, TypeError, parser.ParserError) as e:
        logger.debug(f"Date parsing failed for '{date_val}': {e}")
        return error_val

def to_numeric_safe(value, target_type=float, default_value=None):
    """
    Safely converts a value to a specified numeric type (float or int).
    Removes common non-numeric characters like currency symbols.
    """
    if pd.isna(value):
        return default_value
    
    s_value = str(value).strip()
    if not s_value:
        return default_value
        
    # Remove currency symbols, commas (for thousands), and trailing/leading non-numeric noise common in financial strings
    s_value = re.sub(r'[^\d\.\-eE]', '', s_value) # Allow for scientific notation as well
    
    if not s_value or s_value == '.' or s_value == '-': # Check if only a dot/hyphen remains or empty
        return default_value
    
    try:
        if target_type == int:
            # For int, first convert to float to handle "123.0" cases, then to int
            return int(float(s_value))
        else: # float
            return float(s_value)
    except ValueError:
        logger.debug(f"Could not convert '{value}' (cleaned: '{s_value}') to {target_type.__name__}.")
        return default_value

def standardize_boolean_strict(value, 
                               true_values={'true', 'yes', '1', 'active', 'completed', 'delivered'},
                               false_values={'false', 'no', '0', 'inactive', 'failed', 'pending', 'cancelled', 'returned'}):
    """Standardizes various inputs to Python boolean True/False, or None if ambiguous."""
    if pd.isna(value):
        return None
    
    cleaned_value = str(value).strip().lower()
    if not cleaned_value: # Empty string after strip
        return None

    if cleaned_value in true_values:
        return True
    if cleaned_value in false_values:
        return False
    
    # Handle numeric representations if they weren't in true/false_values
    try:
        num_value = float(cleaned_value)
        if num_value == 1.0: return True
        if num_value == 0.0: return False
    except ValueError:
        pass # Not a number, and not in string lists

    logger.debug(f"Ambiguous boolean value: '{value}'. Returning None.")
    return None # Ambiguous

def standardize_phone_strict(phone_str):
    if pd.isna(phone_str) or str(phone_str).strip() == '':
        return None
    digits = re.sub(r'\D', '', str(phone_str))
    
    if len(digits) == 10: # Standard US 10-digit
        return f"({digits[0:3]}) {digits[3:6]}-{digits[6:10]}"
    elif len(digits) == 11 and digits.startswith('1'): # US 11-digit with country code
        return f"+1 ({digits[1:4]}) {digits[4:7]}-{digits[7:11]}"
    elif len(digits) > 0: # Return cleaned digits if not a standard format
        logger.debug(f"Non-standard phone format for '{phone_str}', returning digits: {digits}")
        return digits 
    return None # No digits found

def standardize_postal_code(code_str):
    if pd.isna(code_str) or str(code_str).strip() == '':
        return None
    cleaned_code = clean_string(str(code_str))
    # Try to extract a 5-digit code, or 5-4 format.
    match = re.match(r'^(\d{5})(-\d{4})?$', cleaned_code)
    if match:
        return match.group(1) # Return just the 5-digit part
    # Handle cases where zip might be float string like "12345.0"
    if '.' in cleaned_code:
        cleaned_code = cleaned_code.split('.')[0]
        if re.match(r'^\d{5}$', cleaned_code):
            return cleaned_code
            
    logger.debug(f"Could not standardize postal code: '{code_str}'. Returning cleaned or None.")
    return cleaned_code if cleaned_code else None


def get_current_timestamp_str():
    return datetime.now().strftime('%Y-%m-%d %H:%M:%S')

logger.info("Utility functions defined.")

2025-06-23 08:48:42,506 - INFO - Utility functions defined.


# 2.2 Database Schema Definition and Creation

In [11]:
def create_tables(engine):
    """Creates database tables based on the defined ERD."""
    with engine.connect() as connection:
        try:
            # Drop tables if they exist for a clean slate (idempotency for dev)
            connection.execute(text("DROP TABLE IF EXISTS OrderItems;"))
            connection.execute(text("DROP TABLE IF EXISTS Orders;"))
            connection.execute(text("DROP TABLE IF EXISTS Products;"))
            connection.execute(text("DROP TABLE IF EXISTS Customers;"))
            logger.info("Dropped existing tables (if any).")

            # Customers Table
            connection.execute(text("""
            CREATE TABLE Customers (
                customer_id TEXT PRIMARY KEY, -- Canonical CUST_XXXX
                customer_name TEXT,
                email TEXT, -- Should be made unique after cleaning, or handle duplicates
                phone TEXT,
                address_street TEXT,
                address_city TEXT,
                address_state TEXT,
                address_postal_code TEXT,
                registration_date DATE,
                status TEXT,
                total_orders INTEGER,
                total_spent REAL,
                loyalty_points INTEGER,
                preferred_payment_method TEXT,
                birth_date DATE,
                age INTEGER,
                gender TEXT,
                segment TEXT,
                source_customer_id_int INTEGER, -- Original int ID
                last_updated_pipeline DATETIME
            );
            """))
            # Add UNIQUE constraint on email later if desirable after handling duplicates
            logger.info("Customers table created.")

            # Products Table
            connection.execute(text("""
            CREATE TABLE Products (
                product_id TEXT PRIMARY KEY, -- Canonical PROD_XXX
                product_name TEXT,
                description TEXT,
                category TEXT,
                brand TEXT,
                manufacturer TEXT,
                price REAL, 
                cost REAL, 
                weight_kg REAL,
                dim_length_cm REAL,
                dim_width_cm REAL,
                dim_height_cm REAL,
                color TEXT,
                size TEXT,
                stock_quantity INTEGER,
                reorder_level INTEGER,
                supplier_id TEXT,
                is_active BOOLEAN,
                rating REAL,
                product_created_date DATE,
                product_last_updated_source DATETIME,
                source_item_id_int INTEGER, -- Original int item_id
                last_updated_pipeline DATETIME
            );
            """))
            logger.info("Products table created.")

            # Orders Table
            connection.execute(text("""
            CREATE TABLE Orders (
                order_id TEXT PRIMARY KEY, -- Canonical ORD_XXXX or TXN_XXXX
                customer_id TEXT,
                order_date DATETIME,
                order_status TEXT,
                payment_method TEXT,
                payment_status TEXT,
                delivery_status TEXT,
                shipping_address_full TEXT,
                shipping_cost_total REAL, -- Changed name for clarity
                tax_total REAL,
                discount_total REAL,
                order_total_value_gross REAL,
                order_total_value_net REAL, 
                amount_paid_total REAL,
                tracking_number TEXT,
                notes TEXT,
                source_order_id_int INTEGER, 
                last_updated_pipeline DATETIME,
                FOREIGN KEY (customer_id) REFERENCES Customers(customer_id)
            );
            """))
            logger.info("Orders table created.")

            # OrderItems Table
            connection.execute(text("""
            CREATE TABLE OrderItems (
                order_item_id INTEGER PRIMARY KEY AUTOINCREMENT,
                order_id TEXT,
                product_id TEXT,
                customer_id TEXT, -- Denormalized for potential direct queries on items per customer
                quantity INTEGER,
                unit_price REAL,
                line_item_total_value REAL,
                line_item_discount REAL,
                line_item_tax REAL,
                line_item_shipping_fee REAL,
                source_file TEXT, 
                original_line_identifier TEXT, -- To trace back to source if needed
                last_updated_pipeline DATETIME,
                FOREIGN KEY (order_id) REFERENCES Orders(order_id),
                FOREIGN KEY (product_id) REFERENCES Products(product_id),
                FOREIGN KEY (customer_id) REFERENCES Customers(customer_id) 
            );
            """))
            logger.info("OrderItems table created.")

            # Create Indexes
            connection.execute(text("CREATE UNIQUE INDEX IF NOT EXISTS idx_customers_email ON Customers(email);")) # Email should ideally be unique
            connection.execute(text("CREATE INDEX IF NOT EXISTS idx_orders_customer_id ON Orders(customer_id);"))
            connection.execute(text("CREATE INDEX IF NOT EXISTS idx_orders_order_date ON Orders(order_date);"))
            connection.execute(text("CREATE INDEX IF NOT EXISTS idx_orderitems_order_id ON OrderItems(order_id);"))
            connection.execute(text("CREATE INDEX IF NOT EXISTS idx_orderitems_product_id ON OrderItems(product_id);"))
            connection.execute(text("CREATE INDEX IF NOT EXISTS idx_orderitems_customer_id ON OrderItems(customer_id);"))
            connection.execute(text("CREATE INDEX IF NOT EXISTS idx_products_category ON Products(category);"))
            connection.execute(text("CREATE INDEX IF NOT EXISTS idx_products_brand ON Products(brand);"))
            logger.info("Indexes created.")
            
            connection.commit()
            logger.info("Database tables and indexes created successfully.")
        except Exception as e:
            logger.error(f"Error creating tables: {e}")
            if connection:
                connection.rollback()
            raise

engine = create_engine(f'sqlite:///{DB_PATH}')
create_tables(engine)

2025-06-23 08:48:42,561 - INFO - Dropped existing tables (if any).
2025-06-23 08:48:42,566 - INFO - Customers table created.
2025-06-23 08:48:42,571 - INFO - Products table created.
2025-06-23 08:48:42,576 - INFO - Orders table created.
2025-06-23 08:48:42,580 - INFO - OrderItems table created.
2025-06-23 08:48:42,601 - INFO - Indexes created.
2025-06-23 08:48:42,602 - INFO - Database tables and indexes created successfully.


# 2.3 Customers ETL

In [12]:
def etl_customers(df_raw_cust, db_engine):
    logger.info("Starting Customers ETL process...")
    if df_raw_cust.empty:
        logger.warning("Raw customer DataFrame is empty. Skipping ETL.")
        return pd.DataFrame()
        
    df = df_raw_cust.copy()
    pipeline_timestamp = get_current_timestamp_str()

    # 1. ID Unification and Source ID
    df['customer_id'] = df['cust_id'].apply(lambda x: clean_string(x, case='upper'))
    df['source_customer_id_int'] = df['customer_id'].apply(lambda x: to_numeric_safe(x, target_type=int)) # Original int customer_id
    
    # Handle cases where canonical customer_id might still be missing if cust_id was bad
    missing_cust_id_mask = df['customer_id'].isnull()
    df.loc[missing_cust_id_mask, 'customer_id'] = 'CUST_UNKNOWN_' + df.loc[missing_cust_id_mask, 'source_customer_id_int'].astype(str)
    df.loc[missing_cust_id_mask & df['source_customer_id_int'].isnull(), 'customer_id'] = 'CUST_UNKNOWN_INVALID'


    # 2. Name: Coalesce and standardize
    # Prefer customer_name if it's not username-like, else full_name if not username-like, else first non-null
    def choose_name(row):
        cn = str(row['customer_name'])
        fn = str(row['full_name'])
        is_cn_username = bool(re.match(r'^[a-z\._\-0-9@]+$', cn.lower()))
        is_fn_username = bool(re.match(r'^[a-z\._\-0-9@]+$', fn.lower()))

        if pd.notna(row['customer_name']) and not is_cn_username:
            return clean_string(cn, 'title')
        if pd.notna(row['full_name']) and not is_fn_username:
            return clean_string(fn, 'title')
        if pd.notna(row['customer_name']): # Fallback to customer_name even if username-like
             return clean_string(cn, 'title')
        if pd.notna(row['full_name']): # Fallback to full_name even if username-like
             return clean_string(fn, 'title')
        return DEFAULT_UNKNOWN_CATEGORICAL
        
    df['customer_name'] = df.apply(choose_name, axis=1)

    # 3. Email: Coalesce, clean, handle empty.
    df['email_temp'] = df['email'].replace('', pd.NA).apply(lambda x: clean_string(x, 'lower') if pd.notna(x) else None)
    df['email_address_temp'] = df['email_address'].replace('', pd.NA).apply(lambda x: clean_string(x, 'lower') if pd.notna(x) else None)
    df['email'] = df['email_temp'].fillna(df['email_address_temp'])
    # Further validation for email format can be added here if needed.

    # 4. Phone: Coalesce and standardize
    df['phone_temp'] = df['phone'].replace('', pd.NA)
    df['phone_number_temp'] = df['phone_number'].replace('', pd.NA)
    df['phone'] = df['phone_number_temp'].fillna(df['phone_temp']).apply(standardize_phone_strict)

    # 5. Address
    df['address_street'] = df['address'].apply(lambda x: clean_string(x, 'title'))
    df['address_city'] = df['city'].apply(lambda x: standardize_categorical(x, CITY_NORMALIZATION_MAP, default_value=clean_string(x, 'title'), case_transform='title'))
    df['address_state'] = df['state'].apply(lambda x: standardize_categorical(x, STATE_ABBREVIATION_MAP, default_value=clean_string(x, 'upper'), case_transform='title'))
    
    df['postal_code_temp'] = df['postal_code'].replace('', pd.NA).fillna(df['zip_code'].replace('', pd.NA))
    df['address_postal_code'] = df['postal_code_temp'].apply(standardize_postal_code)

    # 6. Dates
    df['reg_date_temp'] = df['reg_date'].replace('', pd.NA)
    df['registration_date_temp'] = df['registration_date'].replace('', pd.NA)
    df['registration_date'] = df['registration_date_temp'].fillna(df['reg_date_temp']).apply(parse_date_robustly)
    df['birth_date'] = df['birth_date'].apply(parse_date_robustly)
    
    # 7. Status
    df['status_temp'] = df['status'].replace('', pd.NA)
    df['customer_status_temp'] = df['customer_status'].replace('', pd.NA)
    df['status'] = df['customer_status_temp'].fillna(df['status_temp']).apply(lambda x: standardize_categorical(x, CUSTOMER_STATUS_MAP, default_value=DEFAULT_STATUS_UNKNOWN))


    # 8. Numeric
    df['total_spent'] = df['total_spent'].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=0.0))
    df['total_orders'] = df['total_orders'].apply(lambda x: to_numeric_safe(x, target_type=int, default_value=0))
    df['loyalty_points'] = df['loyalty_points'].apply(lambda x: to_numeric_safe(x, target_type=int, default_value=0))

    # 9. Age (recalculate if birth_date is available, otherwise use provided age)
    def calculate_age(birth_date_str):
        if pd.isna(birth_date_str): return None
        try:
            birth_dt = datetime.strptime(birth_date_str, '%Y-%m-%d')
            today = datetime.today()
            return today.year - birth_dt.year - ((today.month, today.day) < (birth_dt.month, birth_dt.day))
        except (ValueError, TypeError): return None
            
    df['age_calculated'] = df['birth_date'].apply(calculate_age)
    df['age'] = df['age_calculated'].fillna(df['age'].apply(lambda x: to_numeric_safe(x, target_type=int)))


    # 10. Gender
    df['gender'] = df['gender'].apply(lambda x: standardize_categorical(x, GENDER_MAP, default_value=DEFAULT_UNKNOWN_CATEGORICAL, case_transform='title')) # Title case for initial match

    # 11. Segment & Payment Method
    df['segment'] = df['segment'].apply(lambda x: clean_string(x, 'upper', DEFAULT_UNKNOWN_CATEGORICAL))
    df['preferred_payment_method'] = df['preferred_payment'].apply(lambda x: clean_string(x, 'lower', DEFAULT_UNKNOWN_CATEGORICAL))

    # Select and rename columns for the target schema
    df_final_customers = df[[
        'customer_id', 'customer_name', 'email', 'phone',
        'address_street', 'address_city', 'address_state', 'address_postal_code',
        'registration_date', 'status', 'total_orders', 'total_spent',
        'loyalty_points', 'preferred_payment_method', 'birth_date', 'age', 'gender', 
        'segment', 'source_customer_id_int'
    ]]
    
    # Handle records with missing primary key after cleaning
    df_final_customers = df_final_customers.dropna(subset=['customer_id'])
    logger.info(f"Customers after cleaning and selection: {df_final_customers.shape[0]} rows.")
    
    # Deduplication: Based on canonical customer_id. If emails are unique constraint, this needs more.
    # For now, keep first. In production, might merge based on latest registration_date or other logic.
    df_final_customers = df_final_customers.drop_duplicates(subset=['customer_id'], keep='first')
    logger.info(f"Customers after dropping duplicates on customer_id: {df_final_customers.shape[0]} rows.")

    # Deduplicate based on email if it's meant to be unique and isn't None
    df_final_customers_with_email = df_final_customers[df_final_customers['email'].notna()]
    df_final_customers_null_email = df_final_customers[df_final_customers['email'].isna()]
    df_final_customers_with_email = df_final_customers_with_email.drop_duplicates(subset=['email'], keep='first')
    df_final_customers = pd.concat([df_final_customers_with_email, df_final_customers_null_email], ignore_index=True)
    logger.info(f"Customers after additionally dropping duplicates on email (if present): {df_final_customers.shape[0]} rows.")

    df_final_customers['last_updated_pipeline'] = pipeline_timestamp
    
    # Load into SQLite
    try:
        df_final_customers.to_sql('Customers', db_engine, if_exists='append', index=False)
        logger.info(f"{len(df_final_customers)} cleaned customer records loaded/appended into Customers table.")
    except Exception as e:
        logger.error(f"Error loading cleaned customer data to SQLite: {e}")
        # Consider what to do on error: raise, log and continue, etc.
        # For this exercise, we log and potentially skip loading this batch if fatal.

    logger.info("Customers ETL process finished.")
    return df_final_customers

# Run Customer ETL
df_customers_final_loaded = etl_customers(df_customers_raw, engine)

2025-06-23 08:48:42,643 - INFO - Starting Customers ETL process...
2025-06-23 08:48:42,725 - INFO - Customers after cleaning and selection: 500 rows.
2025-06-23 08:48:42,726 - INFO - Customers after dropping duplicates on customer_id: 500 rows.
2025-06-23 08:48:42,728 - INFO - Customers after additionally dropping duplicates on email (if present): 500 rows.
2025-06-23 08:48:42,749 - INFO - 500 cleaned customer records loaded/appended into Customers table.
2025-06-23 08:48:42,750 - INFO - Customers ETL process finished.


# 2.4 Products ETL

In [13]:
def etl_products(df_raw_prod, db_engine):
    logger.info("Starting Products ETL process...")
    if df_raw_prod.empty:
        logger.warning("Raw product DataFrame is empty. Skipping ETL.")
        return pd.DataFrame()
        
    df = df_raw_prod.copy()
    pipeline_timestamp = get_current_timestamp_str()

    df['product_id'] = df['product_id'].apply(lambda x: clean_string(x, 'upper'))
    df['source_item_id_int'] = df['item_id'].apply(lambda x: to_numeric_safe(x, target_type=int))
    
    df['product_name'] = df['product_name'].fillna(df['item_name']).apply(lambda x: clean_string(x, 'title'))
    df['description'] = df['description'].apply(lambda x: clean_string(x) if pd.notna(x) else "No description available")

    # Category: Coalesce and standardize. Example: prefer 'category' then 'product_category'
    df['category_temp'] = df['category'].fillna(df['product_category']).apply(lambda x: clean_string(x, 'title'))
    # If you have a predefined list of categories, map to that. Otherwise, just clean.
    df['category'] = df['category_temp'].fillna(DEFAULT_UNKNOWN_CATEGORICAL)

    # Brand/Manufacturer: Coalesce, clean, standardize.
    df['brand_temp'] = df['brand'].replace('', pd.NA).fillna(df['manufacturer'].replace('', pd.NA))
    df['brand'] = df['brand_temp'].apply(lambda x: clean_string(x, 'upper', DEFAULT_UNKNOWN_CATEGORICAL))
    
    df['manufacturer_temp'] = df['manufacturer'].replace('', pd.NA).fillna(df['brand'].replace('', pd.NA))
    df['manufacturer'] = df['manufacturer_temp'].apply(lambda x: clean_string(x, 'upper', DEFAULT_UNKNOWN_CATEGORICAL))


    df['price'] = df['price'].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=DEFAULT_UNKNOWN_NUMERIC_FLOAT))
    df['cost'] = df['cost'].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=DEFAULT_UNKNOWN_NUMERIC_FLOAT))
    df['weight_kg'] = df['weight'].apply(lambda x: to_numeric_safe(x, target_type=float)) # Null if unparseable
    df['rating'] = df['rating'].apply(lambda x: to_numeric_safe(x, target_type=float))

    def parse_dimensions_strict(dim_str):
        if pd.isna(dim_str) or str(dim_str).strip() == '':
            return None, None, None
        parts = str(dim_str).lower().split('x')
        if len(parts) == 3:
            l = to_numeric_safe(parts[0], target_type=float)
            w = to_numeric_safe(parts[1], target_type=float)
            h = to_numeric_safe(parts[2], target_type=float)
            return l, w, h
        return None, None, None

    dims = df['dimensions'].apply(parse_dimensions_strict)
    df['dim_length_cm'] = dims.apply(lambda x: x[0] if x else None)
    df['dim_width_cm'] = dims.apply(lambda x: x[1] if x else None)
    df['dim_height_cm'] = dims.apply(lambda x: x[2] if x else None)

    df['color'] = df['color'].replace('', pd.NA).apply(lambda x: clean_string(x, 'title', DEFAULT_UNKNOWN_CATEGORICAL))
    df['size'] = df['size'].replace('', pd.NA).apply(lambda x: clean_string(x, 'upper', 'N/A'))

    df['stock_quantity'] = df['stock_quantity'].fillna(df['stock_level']).apply(lambda x: to_numeric_safe(x, target_type=int, default_value=DEFAULT_UNKNOWN_NUMERIC_INT))
    df['reorder_level'] = df['reorder_level'].apply(lambda x: to_numeric_safe(x, target_type=int, default_value=DEFAULT_UNKNOWN_NUMERIC_INT))
    
    df['supplier_id'] = df['supplier_id'].apply(lambda x: clean_string(x, 'upper', DEFAULT_UNKNOWN_CATEGORICAL))
    
    df['is_active'] = df['is_active'].apply(standardize_boolean_strict) # Returns True, False, or None

    df['product_created_date'] = df['created_date'].apply(lambda x: parse_date_robustly(x))
    df['product_last_updated_source'] = df['last_updated'].apply(lambda x: parse_date_robustly(x, output_format='%Y-%m-%d %H:%M:%S'))

    target_product_cols = [
        'product_id', 'product_name', 'description', 'category', 'brand', 'manufacturer',
        'price', 'cost', 'weight_kg', 'dim_length_cm', 'dim_width_cm', 'dim_height_cm',
        'color', 'size', 'stock_quantity', 'reorder_level', 'supplier_id', 'is_active',
        'rating', 'product_created_date', 'product_last_updated_source', 'source_item_id_int'
    ]
    df_final_products = df[target_product_cols]
    
    df_final_products = df_final_products.dropna(subset=['product_id'])
    logger.info(f"Products after cleaning and selection: {df_final_products.shape[0]} rows.")
    
    df_final_products = df_final_products.drop_duplicates(subset=['product_id'], keep='first')
    logger.info(f"Products after dropping duplicates on product_id: {df_final_products.shape[0]} rows.")
    
    df_final_products['last_updated_pipeline'] = pipeline_timestamp

    try:
        df_final_products.to_sql('Products', db_engine, if_exists='append', index=False)
        logger.info(f"{len(df_final_products)} cleaned product records loaded/appended into Products table.")
    except Exception as e:
        logger.error(f"Error loading cleaned product data to SQLite: {e}")
        
    logger.info("Products ETL process finished.")
    return df_final_products

# Run Product ETL
df_products_final_loaded = etl_products(df_products_raw, engine)

# --- Build Product ID Mapping for Orders ---
# This map will be crucial for OrderItems ETL
# It maps source integer item_id (from products.json) to canonical PROD_XXX product_id
# It also maps PROD_XXX to itself for direct lookups from orders_unstructured
product_id_mapping_dict = {}
if not df_products_final_loaded.empty:
    # Map from source_item_id_int (original item_id from JSON) to canonical product_id
    for _, row in df_products_final_loaded.iterrows():
        if pd.notna(row['source_item_id_int']):
            product_id_mapping_dict[str(int(row['source_item_id_int']))] = row['product_id']
        # Also map canonical product_id to itself for direct lookups
        if pd.notna(row['product_id']):
            product_id_mapping_dict[str(row['product_id'])] = row['product_id']
    logger.info(f"Product ID mapping dictionary created with {len(product_id_mapping_dict)} entries.")
else:
    logger.warning("Product mapping dictionary could not be built as df_products_final_loaded is empty.")

# Fetch existing canonical IDs for FK checks (now that tables are populated)
existing_customer_ids_set = set(pd.read_sql_query("SELECT DISTINCT customer_id FROM Customers", engine)['customer_id'])
existing_product_ids_set = set(pd.read_sql_query("SELECT DISTINCT product_id FROM Products", engine)['product_id'])
logger.info(f"Fetched {len(existing_customer_ids_set)} existing customer IDs and {len(existing_product_ids_set)} existing product IDs from DB.")

2025-06-23 08:48:42,775 - INFO - Starting Products ETL process...
2025-06-23 08:48:42,825 - INFO - Products after cleaning and selection: 200 rows.
2025-06-23 08:48:42,826 - INFO - Products after dropping duplicates on product_id: 200 rows.
2025-06-23 08:48:42,841 - INFO - 200 cleaned product records loaded/appended into Products table.
2025-06-23 08:48:42,842 - INFO - Products ETL process finished.
2025-06-23 08:48:42,857 - INFO - Product ID mapping dictionary created with 400 entries.
2025-06-23 08:48:42,862 - INFO - Fetched 500 existing customer IDs and 200 existing product IDs from DB.


# 2.5 Orders & OrderItems ETL

In [14]:
# Cell 2.5.1: Process reconciliation_challenge_data.csv (Order Items Source 1) (CORRECTED for 'notes')

def etl_order_items_from_reconciliation(df_raw, existing_cust_ids_set, existing_prod_ids_set, prod_id_int_map):
    logger.info("Starting ETL for Order Items from reconciliation_challenge_data...")
    if df_raw.empty:
        logger.warning("Raw reconciliation_challenge_data DataFrame is empty. Skipping.")
        return pd.DataFrame()
        
    df = df_raw.copy()
    pipeline_timestamp = get_current_timestamp_str()

    # Rename columns to align with target OrderItems and intermediate needs
    df.rename(columns={
        'client_reference': 'customer_id_source', 
        'transaction_ref': 'order_id_source',
        'item_reference': 'product_id_source_raw', 
        'transaction_date': 'order_date_source',
        'amount_paid': 'line_item_amount_paid_source', 
        'payment_status': 'payment_status_source',
        'delivery_status': 'delivery_status_source', 
        'quantity_ordered': 'quantity',
        'unit_cost': 'unit_price_source', 
        'total_value': 'total_value_provided',
        'discount_applied': 'line_item_discount_source', 
        'shipping_fee': 'line_item_shipping_fee_source',
        'tax_amount': 'line_item_tax_source', 
        'notes_comments': 'line_item_notes_original' # Rename original to avoid conflict before cleaning
    }, inplace=True)

    df['order_id'] = df['order_id_source'].apply(lambda x: clean_string(x, 'upper'))
    
    def map_recon_customer_id(client_ref_val, canonical_customer_ids_set_local):
        if pd.isna(client_ref_val): return None
        cleaned_client_ref = clean_string(str(client_ref_val), 'upper')
        if cleaned_client_ref and cleaned_client_ref.startswith('CLI_'):
            potential_cust_id = cleaned_client_ref.replace('CLI_', 'CUST_')
            if potential_cust_id in canonical_customer_ids_set_local:
                return potential_cust_id
        logger.debug(f"Could not map client_reference: {client_ref_val} to a known customer_id.")
        return None

    df['customer_id'] = df['customer_id_source'].apply(
        lambda x: map_recon_customer_id(x, existing_cust_ids_set)
    )
    
    def map_recon_product_id_corrected(item_ref_val, product_int_to_canonical_map, canonical_prod_ids_set_local):
        if pd.isna(item_ref_val): return None
        cleaned_item_ref = clean_string(item_ref_val, 'upper') 
        if cleaned_item_ref in canonical_prod_ids_set_local:
            return cleaned_item_ref
        match = re.search(r'\d+$', cleaned_item_ref) 
        if match:
            item_num_str = str(int(match.group(0))) 
            if item_num_str in product_int_to_canonical_map:
                return product_int_to_canonical_map[item_num_str]
        logger.debug(f"Could not map product_id_source_raw from reconciliation: {item_ref_val}")
        return None
            
    df['product_id'] = df['product_id_source_raw'].apply(
        lambda x: map_recon_product_id_corrected(x, prod_id_int_map, existing_prod_ids_set)
    )

    mapped_cust_count = df['customer_id'].notna().sum()
    logger.info(f"Customer ID mapping in reconciliation: {mapped_cust_count} of {len(df_raw)} initially attempted.")
    mapped_prod_count = df['product_id'].notna().sum()
    logger.info(f"Product ID mapping in reconciliation: {mapped_prod_count} of {len(df_raw)} initially attempted.")
    
    # Log unmapped samples if any (for debugging)
    if mapped_cust_count < len(df_raw):
        unmapped_cust_sample = df[df['customer_id'].isnull()]['customer_id_source'].value_counts().head(5)
        if not unmapped_cust_sample.empty: logger.warning(f"Sample unmapped customer_id_source values:\n{unmapped_cust_sample}")
    if mapped_prod_count < len(df_raw):
        unmapped_prod_sample = df[df['product_id'].isnull()]['product_id_source_raw'].value_counts().head(5)
        if not unmapped_prod_sample.empty: logger.warning(f"Sample unmapped product_id_source_raw values:\n{unmapped_prod_sample}")

    df.dropna(subset=['order_id', 'customer_id', 'product_id'], inplace=True)
    logger.info(f"Shape after dropping NA in key IDs: {df.shape}")
    
    # These filters are effectively applied by dropna if IDs become None after mapping
    df = df[df['customer_id'].isin(existing_cust_ids_set)] 
    logger.info(f"Shape after filtering by existing_cust_ids: {df.shape}")
    
    df = df[df['product_id'].isin(existing_prod_ids_set)]
    logger.info(f"Shape after filtering by existing_prod_ids: {df.shape}")
    
    if df.empty:
        logger.warning("No valid records after ID mapping/filtering in reconciliation data.")
        return pd.DataFrame()

    df['order_date'] = df['order_date_source'].apply(lambda x: parse_date_robustly(x, output_format='%Y-%m-%d %H:%M:%S'))
    df['quantity'] = df['quantity'].apply(lambda x: to_numeric_safe(x, target_type=int, default_value=1))
    df['unit_price'] = df['unit_price_source'].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=0.0))
    df['line_item_total_value'] = df['quantity'] * df['unit_price']
    
    df['total_value_provided_numeric'] = df['total_value_provided'].apply(to_numeric_safe)
    discrepancy_check = ~np.isclose(df['line_item_total_value'], df['total_value_provided_numeric'].fillna(df['line_item_total_value']))
    if discrepancy_check.any():
        logger.warning(f"{discrepancy_check.sum()} reconciliation items show discrepancy between calculated total_value and provided total_value.")

    df['line_item_discount'] = df['line_item_discount_source'].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=0.0))
    df['line_item_shipping_fee'] = df['line_item_shipping_fee_source'].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=0.0))
    df['line_item_tax'] = df['line_item_tax_source'].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=0.0))
    df['line_item_amount_paid_final'] = df['line_item_amount_paid_source'].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=0.0)) 
    
    df['payment_status_derived'] = df['payment_status_source'].apply(lambda x: standardize_categorical(x, PAYMENT_STATUS_MAP))
    df['delivery_status_derived'] = df['delivery_status_source'].apply(lambda x: standardize_categorical(x, ORDER_DELIVERY_STATUS_MAP))
    
    # Clean the original notes column and assign it to 'line_item_notes'
    if 'line_item_notes_original' in df.columns:
         df['line_item_notes'] = df['line_item_notes_original'].apply(lambda x: clean_string(x))
    else: # Should not happen if 'notes_comments' existed in raw
        df['line_item_notes'] = None


    df['original_line_identifier'] = df['order_id_source'].astype(str) + "_" + df['product_id_source_raw'].astype(str)
    df['source_file'] = 'reconciliation_challenge_data.csv'
    df['last_updated_pipeline'] = pipeline_timestamp

    # Ensure all columns needed for df_final are present in df
    final_cols_for_df1 = [
        'order_id', 'customer_id', 'product_id', 'order_date', 'quantity', 'unit_price',
        'line_item_total_value', 'line_item_discount', 'line_item_shipping_fee', 'line_item_tax',
        'payment_status_derived', 'delivery_status_derived', 'line_item_notes', 
        'line_item_amount_paid_final', 'original_line_identifier', 'source_file', 'last_updated_pipeline'
    ]
    for col in final_cols_for_df1:
        if col not in df.columns:
            df[col] = None # Add if missing, though should be there
            logger.warning(f"Column '{col}' was unexpectedly missing in df_final preparation for reconciliation data and was added as None.")

    df_final = df[final_cols_for_df1]
    
    logger.info(f"Finished ETL for Order Items from reconciliation_challenge_data. Shape: {df_final.shape}")
    return df_final

# --- Re-run the execution for this specific ETL step ---
logger.info("Re-running ETL for reconciliation_challenge_data with corrected product AND customer ID mapping AND notes handling.")
if 'df_reconciliation_raw' in globals() and not df_reconciliation_raw.empty:
    if 'existing_customer_ids_set' not in globals() or 'existing_product_ids_set' not in globals() or 'product_id_mapping_dict' not in globals():
        logger.error("CRITICAL: Prerequisite ID sets or mapping dictionary for reconciliation ETL not found.")
        df_order_items_source1 = pd.DataFrame()
    else:
        df_order_items_source1 = etl_order_items_from_reconciliation(
            df_reconciliation_raw, 
            existing_customer_ids_set, 
            existing_product_ids_set,
            product_id_mapping_dict 
        )
        if df_order_items_source1 is not None and not df_order_items_source1.empty:
            logger.info(f"Successfully processed reconciliation order items. Shape: {df_order_items_source1.shape}")
            print("\n--- Processed Order Items from Reconciliation Data (Sample after fixes) ---")
            print(df_order_items_source1.head())
        else:
            logger.warning("ETL for reconciliation order items resulted in an empty or None DataFrame AFTER CORRECTIONS.")
else:
    logger.warning("Raw reconciliation data (df_reconciliation_raw) is empty or not defined.")
    df_order_items_source1 = pd.DataFrame()

2025-06-23 08:48:42,894 - INFO - Re-running ETL for reconciliation_challenge_data with corrected product AND customer ID mapping AND notes handling.
2025-06-23 08:48:42,895 - INFO - Starting ETL for Order Items from reconciliation_challenge_data...
2025-06-23 08:48:42,901 - INFO - Customer ID mapping in reconciliation: 300 of 300 initially attempted.
2025-06-23 08:48:42,903 - INFO - Product ID mapping in reconciliation: 300 of 300 initially attempted.
2025-06-23 08:48:42,906 - INFO - Shape after dropping NA in key IDs: (300, 23)
2025-06-23 08:48:42,908 - INFO - Shape after filtering by existing_cust_ids: (300, 23)
2025-06-23 08:48:42,910 - INFO - Shape after filtering by existing_prod_ids: (300, 23)
2025-06-23 08:48:42,942 - INFO - Finished ETL for Order Items from reconciliation_challenge_data. Shape: (300, 17)
2025-06-23 08:48:42,943 - INFO - Successfully processed reconciliation order items. Shape: (300, 17)



--- Processed Order Items from Reconciliation Data (Sample after fixes) ---
    order_id customer_id product_id           order_date  quantity  unit_price  line_item_total_value  line_item_discount  line_item_shipping_fee  line_item_tax payment_status_derived delivery_status_derived line_item_notes  line_item_amount_paid_final original_line_identifier                        source_file last_updated_pipeline
0  TXN_00119   CUST_0280   PROD_076  2023-02-12 00:00:00         2      159.48                 318.96               68.82                    7.84          16.62                 FAILED               DELIVERED            None                       664.50        TXN_00119_ITM_076  reconciliation_challenge_data.csv   2025-06-23 08:48:42
1  TXN_00522   CUST_0360   PROD_015  2023-03-14 00:00:00         6      214.26                1285.56               96.22                   28.67          32.03                PENDING                 PENDING            None                       897.23 

# 2.5.2 Process orders_unstructured_data.csv

In [15]:
def etl_order_items_from_unstructured(df_raw, existing_cust_ids, existing_prod_ids, prod_id_map):
    logger.info("Starting ETL for Order Items from orders_unstructured_data...")
    if df_raw.empty:
        logger.warning("Raw orders_unstructured_data DataFrame is empty. Skipping.")
        return pd.DataFrame()
        
    df_working = df_raw.copy() 
    pipeline_timestamp = get_current_timestamp_str()

    # --- ID Coalescing & Standardization ---
    df_working['order_id'] = df_raw['order_id'].fillna(df_raw['ord_id'].astype(str)).apply(lambda x: clean_string(x, 'upper'))
    
    df_working['source_order_id_int'] = df_raw['ord_id'].fillna(
        df_raw['order_id'].apply(lambda x: to_numeric_safe(re.sub(r'\D', '', str(x)), target_type=int) if pd.notna(x) else None)
    ).apply(lambda x: to_numeric_safe(x, target_type=int))
    
    customer_id_str_source = df_raw['cust_id'].astype(str).apply(lambda x: clean_string(x, 'upper'))
    customer_id_int_source = df_raw['customer_id'].apply(lambda x: to_numeric_safe(x, target_type=int))
    df_working['customer_id'] = customer_id_str_source.fillna(
        customer_id_int_source.apply(lambda x: f"CUST_{str(x).zfill(4)}" if pd.notna(x) else None)
    )

    def resolve_unstructured_product_id(row_from_raw_data, product_id_lookup_map, canonical_product_id_set):
        prod_id_val = clean_string(row_from_raw_data['product_id'], 'upper')
        item_id_val_str = str(int(row_from_raw_data['item_id'])) if pd.notna(row_from_raw_data['item_id']) else None

        if prod_id_val and prod_id_val in canonical_product_id_set:
            return prod_id_val
        if item_id_val_str and item_id_val_str in product_id_lookup_map:
            return product_id_lookup_map[item_id_val_str]
        
        # Fallback: if item_id (string version) matches an existing PROD_XXX id
        if item_id_val_str and item_id_val_str in canonical_product_id_set: 
             return item_id_val_str                                        
                                                                             
        
        logger.debug(f"Could not map product_id/item_id: {row_from_raw_data['product_id']}/{row_from_raw_data['item_id']}")
        return None
        
    df_working['product_id'] = df_raw.apply(
        lambda row: resolve_unstructured_product_id(row, prod_id_map, existing_prod_ids), axis=1
    )
    
    logger.info(f"Shape after initial ID mapping and before filtering: {df_working.shape}")
    logger.info(f"Sample of resolved IDs before filtering:\n{df_working[['order_id', 'customer_id', 'product_id']].head()}")

    # --- Initial Filtering based on resolved IDs ---
    original_count_before_dropna = len(df_working)
    df_working.dropna(subset=['order_id', 'customer_id', 'product_id'], inplace=True)
    logger.info(f"Rows after dropping NA in key IDs: {len(df_working)}. Dropped: {original_count_before_dropna - len(df_working)}")
    
    original_count_before_cust_filter = len(df_working)
    df_working = df_working[df_working['customer_id'].isin(existing_cust_ids)]
    logger.info(f"Rows after filtering by existing_cust_ids: {len(df_working)}. Dropped: {original_count_before_cust_filter - len(df_working)}")

    original_count_before_prod_filter = len(df_working)
    df_working = df_working[df_working['product_id'].isin(existing_prod_ids)]
    logger.info(f"Rows after filtering by existing_prod_ids: {len(df_working)}. Dropped: {original_count_before_prod_filter - len(df_working)}")


    if df_working.empty:
        logger.warning("No valid records after ID mapping/filtering in unstructured orders data.")
        return pd.DataFrame()

    # --- Apply transformations using .loc[df_working.index] on df_raw to ensure correct alignment ---
    df_working['order_date'] = df_raw['order_datetime'].fillna(df_raw['order_date']).loc[df_working.index].apply(
        lambda x: parse_date_robustly(x, output_format='%Y-%m-%d %H:%M:%S')
    )

    df_working['quantity'] = df_raw['quantity'].fillna(df_raw['qty']).loc[df_working.index].apply(
        lambda x: to_numeric_safe(x, target_type=int, default_value=1)
    )
    df_working['unit_price'] = df_raw['unit_price'].fillna(df_raw['price']).loc[df_working.index].apply(
        lambda x: to_numeric_safe(x, target_type=float, default_value=0.0)
    )
    
    df_working['calculated_line_total'] = df_working['quantity'] * df_working['unit_price']
    df_working['line_item_total_value'] = df_raw['total_amount'].loc[df_working.index].apply(
        lambda x: to_numeric_safe(x, target_type=float)
    ).fillna(df_working['calculated_line_total'])
    
    df_working['line_item_discount'] = df_raw['discount'].loc[df_working.index].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=0.0))
    df_working['line_item_tax'] = df_raw['tax'].loc[df_working.index].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=0.0))
    df_working['line_item_shipping_fee'] = df_raw['shipping_cost'].loc[df_working.index].apply(lambda x: to_numeric_safe(x, target_type=float, default_value=0.0))
    
    df_working['line_item_amount_paid'] = (df_working['line_item_total_value'] - 
                                          df_working['line_item_discount'] + 
                                          df_working['line_item_tax'] + 
                                          df_working['line_item_shipping_fee'])

    status_temp = df_raw['status'].replace('',pd.NA).loc[df_working.index]
    order_status_temp = df_raw['order_status'].replace('',pd.NA).loc[df_working.index]
    df_working['line_item_status_source'] = order_status_temp.fillna(status_temp)
    df_working['line_item_status'] = df_working['line_item_status_source'].apply(
        lambda x: standardize_categorical(x, ORDER_DELIVERY_STATUS_MAP, default_value=DEFAULT_STATUS_UNKNOWN)
    )

    df_working['payment_method'] = df_raw['payment_method'].loc[df_working.index].apply(lambda x: clean_string(x, 'lower', DEFAULT_UNKNOWN_CATEGORICAL))
    df_working['shipping_address_full'] = df_raw['shipping_address'].loc[df_working.index].apply(clean_string)
    df_working['notes'] = df_raw['notes'].loc[df_working.index].apply(clean_string)
    df_working['tracking_number'] = df_raw['tracking_number'].loc[df_working.index].apply(clean_string)
    
    df_working['original_line_identifier'] = df_working['order_id'].astype(str) + "_" + \
                                             df_working['product_id'].astype(str) + "_" + \
                                             df_raw['item_id'].loc[df_working.index].astype(str)

    df_final = df_working[[
        'order_id', 'customer_id', 'product_id', 'order_date', 'quantity', 'unit_price',
        'line_item_total_value', 'line_item_discount', 'line_item_shipping_fee', 'line_item_tax',
        'line_item_status', 'payment_method', 'shipping_address_full', 'notes', 'tracking_number',
        'source_order_id_int', 'line_item_amount_paid', 'original_line_identifier'
    ]]
    
    df_final['source_file'] = 'orders_unstructured_data.csv'
    df_final['last_updated_pipeline'] = pipeline_timestamp
    
    logger.info(f"Finished ETL for Order Items from orders_unstructured_data. Shape: {df_final.shape}")
    return df_final

# --- Actual Execution Block for this ETL step ---
# Ensure all prerequisite DataFrames and variables are defined and populated from previous cells
if 'df_orders_unstructured_raw' in globals() and not df_orders_unstructured_raw.empty:
    if 'existing_customer_ids_set' not in globals() or 'existing_product_ids_set' not in globals() or 'product_id_mapping_dict' not in globals():
        logger.error("Prerequisite ID sets or mapping dictionary not found. Ensure Customer and Product ETL ran successfully.")
        df_order_items_source2 = pd.DataFrame() # Initialize as empty
    else:
        logger.info("Prerequisites found. Calling etl_order_items_from_unstructured.")
        df_order_items_source2 = etl_order_items_from_unstructured(
            df_orders_unstructured_raw, 
            existing_customer_ids_set, 
            existing_product_ids_set,
            product_id_mapping_dict 
        )
        
        if df_order_items_source2 is not None and not df_order_items_source2.empty:
            logger.info(f"Successfully processed unstructured order items. Shape: {df_order_items_source2.shape}")
            print("\n--- Processed Order Items from Unstructured Data (Sample) ---")
            print(df_order_items_source2.head())
            # print(df_order_items_source2.info()) # Use .info() for more detail if needed during debug
        else:
            logger.warning("ETL for unstructured order items resulted in an empty or None DataFrame.")
else:
    logger.warning("Raw orders unstructured data (df_orders_unstructured_raw) is empty or not defined. Skipping ETL for this source.")
    df_order_items_source2 = pd.DataFrame() # Ensure it's defined as empty if not processed

2025-06-23 08:48:42,971 - INFO - Prerequisites found. Calling etl_order_items_from_unstructured.
2025-06-23 08:48:42,974 - INFO - Starting ETL for Order Items from orders_unstructured_data...
2025-06-23 08:48:43,008 - INFO - Shape after initial ID mapping and before filtering: (1000, 24)
2025-06-23 08:48:43,011 - INFO - Sample of resolved IDs before filtering:
    order_id customer_id product_id
0  ORD_00001   CUST_0295   PROD_140
1  ORD_00002   CUST_0087   PROD_033
2  ORD_00003   CUST_0297   PROD_172
3  ORD_00004   CUST_0018   PROD_006
4  ORD_00005   CUST_0035   PROD_010
2025-06-23 08:48:43,014 - INFO - Rows after dropping NA in key IDs: 1000. Dropped: 0
2025-06-23 08:48:43,016 - INFO - Rows after filtering by existing_cust_ids: 1000. Dropped: 0
2025-06-23 08:48:43,018 - INFO - Rows after filtering by existing_prod_ids: 1000. Dropped: 0
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the doc


--- Processed Order Items from Unstructured Data (Sample) ---
    order_id customer_id product_id           order_date  quantity  unit_price  line_item_total_value  line_item_discount  line_item_shipping_fee  line_item_tax line_item_status payment_method shipping_address_full                          notes tracking_number  source_order_id_int  line_item_amount_paid original_line_identifier                   source_file last_updated_pipeline
0  ORD_00001   CUST_0295   PROD_140  2023-06-01 00:39:00         6      381.29                 774.00                9.27                    6.90          50.40          SHIPPED     debit_card          4136 Main St  Special delivery instructions            None                    1                 822.03   ORD_00001_PROD_140_116  orders_unstructured_data.csv   2025-06-23 08:48:42
1  ORD_00002   CUST_0087   PROD_033  2023-11-22 17:08:00         8      153.77                 152.92               36.07                   16.45          27.77        DEL

# 2.5.3 Combine Order Item Data and Create Final OrderItems & Orders Tables

In [16]:
# Cell 2.5.3: Combine Order Item Data and Create Final OrderItems & Orders Tables (Focus on amount_paid_total)

def etl_combine_orders_and_load(df_items1_input, df_items2_input, db_engine, cust_ids_set, prod_ids_set):
    logger.info("Starting to combine and finalize order items and derive orders...")
    pipeline_timestamp = get_current_timestamp_str()

    # --- Prepare df_items1 (from reconciliation_challenge_data) ---
    df1_processed = pd.DataFrame()
    if df_items1_input is not None and not df_items1_input.empty:
        df1 = df_items1_input.copy()
        # These renames should align with what etl_order_items_from_reconciliation produces
        df1.rename(columns={
            'payment_status': 'payment_status_derived', 
            'delivery_status': 'delivery_status_derived',
            'notes': 'line_item_notes',
            # 'line_item_amount_paid' from its source function is already 'line_item_amount_paid_final' effectively
        }, inplace=True, errors='ignore') 
        
        # Ensure 'line_item_amount_paid_final' exists (it should from the source function)
        if 'line_item_amount_paid_final' not in df1.columns:
            logger.warning("Col 'line_item_amount_paid_final' missing in df1 from reconciliation! Adding as 0.0")
            df1['line_item_amount_paid_final'] = 0.0
        df1_processed = df1
        logger.info(f"df1_processed (from reconciliation) shape: {df1_processed.shape}. Has 'line_item_amount_paid_final': {'line_item_amount_paid_final' in df1_processed.columns}")
        if 'line_item_amount_paid_final' in df1_processed.columns:
            logger.info(f"Sample of line_item_amount_paid_final from df1_processed:\n{df1_processed[['order_id', 'line_item_amount_paid_final']].head()}")

    else:
        logger.info("df_items1_input is empty or None.")

    # --- Prepare df_items2 (from orders_unstructured_data) ---
    df2_processed = pd.DataFrame()
    if df_items2_input is not None and not df_items2_input.empty:
        df2 = df_items2_input.copy()
        # These renames should align with what etl_order_items_from_unstructured produces
        df2.rename(columns={
            'line_item_status': 'overall_item_status_derived',
            'payment_method': 'payment_method_source',
            'shipping_address_full': 'shipping_address_full_source',
            'notes': 'line_item_notes',
            'tracking_number': 'tracking_number_source',
            'source_order_id_int': 'source_order_id_int_val',
            'line_item_amount_paid': 'line_item_amount_paid_final' # This rename is key
        }, inplace=True, errors='ignore')
        
        # Ensure 'line_item_amount_paid_final' exists after rename
        if 'line_item_amount_paid_final' not in df2.columns:
            logger.warning("Col 'line_item_amount_paid_final' missing in df2 from unstructured after rename! Adding as 0.0")
            df2['line_item_amount_paid_final'] = 0.0 # Should not happen if 'line_item_amount_paid' was in df_items2_input
        df2_processed = df2
        logger.info(f"df2_processed (from unstructured) shape: {df2_processed.shape}. Has 'line_item_amount_paid_final': {'line_item_amount_paid_final' in df2_processed.columns}")
        if 'line_item_amount_paid_final' in df2_processed.columns:
             logger.info(f"Sample of line_item_amount_paid_final from df2_processed:\n{df2_processed[['order_id', 'line_item_amount_paid_final']].head()}")
    else:
        logger.info("df_items2_input is empty or None.")

    # --- Concatenate ---
    if df1_processed.empty and df2_processed.empty:
        logger.warning("Both processed order item DataFrames are empty. Cannot proceed.")
        return pd.DataFrame(), pd.DataFrame()
    
    # Define a superset of columns you expect in df_all_order_items
    # This helps ensure alignment before concat.
    # These are columns that might exist in one but not the other, or common ones.
    superset_cols = [
        'order_id', 'customer_id', 'product_id', 'order_date', 'quantity', 'unit_price',
        'line_item_total_value', 'line_item_discount', 'line_item_shipping_fee', 'line_item_tax',
        'payment_status_derived', 'delivery_status_derived', 'overall_item_status_derived', 
        'line_item_notes', 'line_item_amount_paid_final', 'payment_method_source', 
        'shipping_address_full_source', 'tracking_number_source', 'source_order_id_int_val',
        'source_file', 'original_line_identifier', 'last_updated_pipeline'
    ]

    df1_aligned = pd.DataFrame()
    if not df1_processed.empty:
        for col in superset_cols: # Ensure all columns from superset are present
            if col not in df1_processed.columns:
                df1_processed[col] = None
        df1_aligned = df1_processed[superset_cols] # Select in defined order
        
    df2_aligned = pd.DataFrame()
    if not df2_processed.empty:
        for col in superset_cols: # Ensure all columns from superset are present
            if col not in df2_processed.columns:
                df2_processed[col] = None
        df2_aligned = df2_processed[superset_cols]

    df_all_order_items = pd.concat([df1_aligned, df2_aligned], ignore_index=True)
    df_all_order_items.reset_index(drop=True, inplace=True)
    
    if 'order_item_id' not in df_all_order_items.columns:
         df_all_order_items.insert(0, 'order_item_id', range(1, 1 + len(df_all_order_items)))
    elif df_all_order_items['order_item_id'].isnull().any():
        df_all_order_items['order_item_id'] = range(1, 1 + len(df_all_order_items))

    logger.info(f"Combined order items. Shape: {df_all_order_items.shape}")
    logger.info(f"Columns in df_all_order_items after concat: {df_all_order_items.columns.tolist()}")
    if 'line_item_amount_paid_final' in df_all_order_items.columns:
        logger.info(f"Describe line_item_amount_paid_final in df_all_order_items:\n{df_all_order_items['line_item_amount_paid_final'].describe()}")
        logger.info(f"Sample of line_item_amount_paid_final in df_all_order_items where source is unstructured:\n{df_all_order_items[df_all_order_items['source_file'] == 'orders_unstructured_data.csv'][['order_id', 'line_item_amount_paid_final']].head(10)}")
        logger.info(f"Sample of line_item_amount_paid_final in df_all_order_items where source is reconciliation:\n{df_all_order_items[df_all_order_items['source_file'] == 'reconciliation_challenge_data.csv'][['order_id', 'line_item_amount_paid_final']].head(10)}")
    else:
        logger.error("'line_item_amount_paid_final' column is MISSING in df_all_order_items!")


    # --- Derive Orders Table ---
    if df_all_order_items.empty:
        logger.warning("df_all_order_items is unexpectedly empty before deriving Orders table.")
        return df_all_order_items, pd.DataFrame()

    df_all_order_items['order_level_status_candidate'] = \
        df_all_order_items['delivery_status_derived'].fillna(
        df_all_order_items['overall_item_status_derived']).fillna(
        df_all_order_items['payment_status_derived'] 
    ).fillna(DEFAULT_STATUS_UNKNOWN)
    
    aggregation_defaults = { # Ensure these match column names in df_all_order_items
        'customer_id': None, 'order_date': None, 'order_level_status_candidate': DEFAULT_STATUS_UNKNOWN,
        'payment_method_source': DEFAULT_UNKNOWN_CATEGORICAL, 
        'payment_status_derived': DEFAULT_STATUS_UNKNOWN,
        'delivery_status_derived': DEFAULT_STATUS_UNKNOWN,
        'shipping_address_full_source': None,
        'line_item_shipping_fee': 0.0, 'line_item_tax': 0.0, 'line_item_discount': 0.0,
        'line_item_total_value': 0.0, 'line_item_amount_paid_final': 0.0,
        'tracking_number_source': None, 'line_item_notes': None, 'source_order_id_int_val': None
    }
    for col, default_val in aggregation_defaults.items(): # Check existence before groupby
        if col not in df_all_order_items.columns:
            df_all_order_items[col] = default_val
            logger.info(f"Added missing aggregation source column '{col}' to df_all_order_items for groupby.")

    df_orders = df_all_order_items.groupby('order_id', as_index=False).agg(
        customer_id=('customer_id', 'first'),
        order_date=('order_date', 'min'),
        order_status=('order_level_status_candidate', lambda x: x.mode()[0] if not x.mode().empty else DEFAULT_STATUS_UNKNOWN),
        payment_method=('payment_method_source', lambda x: x.dropna().iloc[0] if not x.dropna().empty else DEFAULT_UNKNOWN_CATEGORICAL),
        payment_status=('payment_status_derived', lambda x: x.dropna().iloc[0] if not x.dropna().empty else DEFAULT_STATUS_UNKNOWN),
        delivery_status=('delivery_status_derived', lambda x: x.dropna().iloc[0] if not x.dropna().empty else DEFAULT_STATUS_UNKNOWN),
        shipping_address_full=('shipping_address_full_source', 'first'),
        shipping_cost_total=('line_item_shipping_fee', 'sum'),
        tax_total=('line_item_tax', 'sum'),
        discount_total=('line_item_discount', 'sum'),
        order_total_value_gross=('line_item_total_value', 'sum'),
        amount_paid_total=('line_item_amount_paid_final', 'sum'), # This should now sum correctly
        tracking_number=('tracking_number_source', 'first'),
        notes=('line_item_notes', lambda x: '; '.join(sorted(list(x.dropna().astype(str).unique()))) if not x.dropna().empty and x.dropna().astype(str).str.len().sum() > 0 else None),
        source_order_id_int=('source_order_id_int_val', 'first')
    )
    
    df_orders['order_total_value_net'] = df_orders['order_total_value_gross'] - df_orders['discount_total']
    df_orders['last_updated_pipeline'] = pipeline_timestamp
    
    df_orders = df_orders[df_orders['customer_id'].isin(cust_ids_set)]
    if df_orders.empty :
        logger.warning("No valid orders after customer_id foreign key check for Orders table. Returning empty DataFrames.")
        return pd.DataFrame(columns=df_all_order_items.columns), pd.DataFrame()

    logger.info(f"Derived Orders table. Shape: {df_orders.shape}")
    if not df_orders.empty:
         logger.info(f"Sample of derived Orders with amount_paid_total:\n{df_orders[['order_id', 'amount_paid_total', 'order_status']].head()}")
    
    final_order_item_db_cols = [
        'order_item_id', 'order_id', 'product_id', 'customer_id', 
        'quantity', 'unit_price', 'line_item_total_value', 'line_item_discount', 
        'line_item_tax', 'line_item_shipping_fee', 'source_file', 
        'original_line_identifier', 'last_updated_pipeline'
    ]
    
    for col in final_order_item_db_cols:
        if col not in df_all_order_items.columns:
            df_all_order_items[col] = None 
            logger.warning(f"Column '{col}' for final OrderItems was missing and added as None.")
            
    df_order_items_for_db = df_all_order_items[final_order_item_db_cols].copy()
    
    df_order_items_for_db = df_order_items_for_db[df_order_items_for_db['order_id'].isin(df_orders['order_id'])]
    logger.info(f"Final OrderItems for DB after filtering by valid orders. Shape: {df_order_items_for_db.shape}")

    if not df_orders.empty:
        try:
            with db_engine.connect() as connection:
                connection.execute(text("DELETE FROM OrderItems;"))
                connection.execute(text("DELETE FROM Orders;"))
                connection.commit()
                logger.info("Cleared existing Orders and OrderItems data from DB for fresh load.")
            df_orders.to_sql('Orders', db_engine, if_exists='append', index=False)
            logger.info(f"{len(df_orders)} records loaded/appended into Orders table.")
        except Exception as e:
            logger.error(f"Error loading Orders data to SQLite: {e}. OrderItems will not be loaded.")
            return pd.DataFrame(), pd.DataFrame() 
    else:
        logger.warning("Derived Orders DataFrame is empty. Nothing to load to Orders table.")
        return df_order_items_for_db, df_orders 

    if not df_order_items_for_db.empty:
        try:
            df_order_items_for_db.to_sql('OrderItems', db_engine, if_exists='append', index=False)
            logger.info(f"{len(df_order_items_for_db)} records loaded/appended into OrderItems table.")
        except Exception as e:
            logger.error(f"Error loading OrderItems data to SQLite: {e}")
    else:
        logger.warning("Final OrderItems DataFrame is empty. Nothing to load to OrderItems table.")
        
    return df_order_items_for_db, df_orders

logger.info("Preparing to call etl_combine_orders_and_load...")

items1_for_combine = df_order_items_source1 if 'df_order_items_source1' in globals() and isinstance(df_order_items_source1, pd.DataFrame) else pd.DataFrame()
items2_for_combine = df_order_items_source2 if 'df_order_items_source2' in globals() and isinstance(df_order_items_source2, pd.DataFrame) else pd.DataFrame()

if items1_for_combine.empty:
    logger.warning("df_order_items_source1 is empty or not a DataFrame going into combine step.")
else:
    logger.info(f"df_order_items_source1 to be combined has shape: {items1_for_combine.shape} and columns: {items1_for_combine.columns.tolist()}")

if items2_for_combine.empty:
    logger.warning("df_order_items_source2 is empty or not a DataFrame going into combine step.")
else:
    logger.info(f"df_order_items_source2 to be combined has shape: {items2_for_combine.shape} and columns: {items2_for_combine.columns.tolist()}")


if 'existing_customer_ids_set' not in globals() or 'existing_product_ids_set' not in globals():
    logger.error("CRITICAL: existing_customer_ids_set or existing_product_ids_set not found. Re-run previous ETL steps for Customers and Products.")
    existing_customer_ids_set = set() 
    existing_product_ids_set = set()

df_final_order_items_loaded, df_final_orders_loaded = etl_combine_orders_and_load(
    items1_for_combine,
    items2_for_combine,
    engine,
    existing_customer_ids_set, 
    existing_product_ids_set
)

if df_final_orders_loaded is not None and not df_final_orders_loaded.empty:
    logger.info(f"Orders table loaded successfully. Final shape: {df_final_orders_loaded.shape}")
    print("\n--- Final Orders Data (Sample from DB load) ---")
    print(df_final_orders_loaded.head())
else:
    logger.warning("Final Orders DataFrame is empty or None after ETL combine step. No orders loaded.")

if df_final_order_items_loaded is not None and not df_final_order_items_loaded.empty:
    logger.info(f"OrderItems table loaded successfully. Final shape: {df_final_order_items_loaded.shape}")
    print("\n--- Final OrderItems Data (Sample from DB load) ---")
    print(df_final_order_items_loaded.head())
else:
    logger.warning("Final OrderItems DataFrame is empty or None after ETL combine step. No order items loaded.")

logger.info("--- ETL Pipeline (Combine Orders & OrderItems) Complete ---")

2025-06-23 08:48:43,160 - INFO - Preparing to call etl_combine_orders_and_load...
2025-06-23 08:48:43,161 - INFO - df_order_items_source1 to be combined has shape: (300, 17) and columns: ['order_id', 'customer_id', 'product_id', 'order_date', 'quantity', 'unit_price', 'line_item_total_value', 'line_item_discount', 'line_item_shipping_fee', 'line_item_tax', 'payment_status_derived', 'delivery_status_derived', 'line_item_notes', 'line_item_amount_paid_final', 'original_line_identifier', 'source_file', 'last_updated_pipeline']
2025-06-23 08:48:43,162 - INFO - df_order_items_source2 to be combined has shape: (1000, 20) and columns: ['order_id', 'customer_id', 'product_id', 'order_date', 'quantity', 'unit_price', 'line_item_total_value', 'line_item_discount', 'line_item_shipping_fee', 'line_item_tax', 'line_item_status', 'payment_method', 'shipping_address_full', 'notes', 'tracking_number', 'source_order_id_int', 'line_item_amount_paid', 'original_line_identifier', 'source_file', 'last_upda


--- Final Orders Data (Sample from DB load) ---
    order_id customer_id           order_date order_status payment_method payment_status delivery_status shipping_address_full  shipping_cost_total  tax_total  discount_total  order_total_value_gross  amount_paid_total tracking_number                          notes source_order_id_int  order_total_value_net last_updated_pipeline
0  ORD_00001   CUST_0295  2023-06-01 00:39:00      SHIPPED     debit_card        UNKNOWN         UNKNOWN          4136 Main St                 6.90      50.40            9.27                   774.00             822.03            None  Special delivery instructions                   1                 764.73   2025-06-23 08:48:43
1  ORD_00002   CUST_0087  2023-11-22 17:08:00    DELIVERED  bank_transfer        UNKNOWN         UNKNOWN          3133 Oak Ave                16.45      27.77           36.07                   152.92             161.07            None                           None                   2    

# Cell 2.5.4: AI-Assisted Schema Reconciliation 

In [17]:
# Cell 2.5.4: AI-Assisted Schema Reconciliation 
import os
import re
import json
import google.generativeai as genai
import getpass # For securely getting API key in a notebook environment

# --- AI Reconciliation Configuration ---
# Global variable to store the Gemini model instance once initialized
gemini_model = None
gemini_api_key_provided = False

def configure_gemini():
    """
    Configures the Gemini API with a user-provided key.
    Returns the initialized model or None if configuration fails.
    """
    global gemini_model, gemini_api_key_provided
    
    if gemini_model is not None: # Already configured
        logger.info("Gemini model already configured.")
        return gemini_model

    try:
        # Attempt to get API key from environment variable first (more secure for repeated runs)
        api_key = os.getenv("GEMINI_API_KEY")
        if not api_key:
            logger.info("GEMINI_API_KEY environment variable not found.")
            api_key_input = getpass.getpass("Enter your Gemini API Key (or press Enter to skip AI features): ")
            if not api_key_input:
                logger.warning("No Gemini API Key provided. AI-assisted features will be skipped.")
                gemini_api_key_provided = False
                return None
            api_key = api_key_input
        
        genai.configure(api_key=api_key)
        model = genai.GenerativeModel('gemini-1.5-pro-latest') # Using the specified model
        gemini_model = model
        gemini_api_key_provided = True
        logger.info("Gemini 1.5 Pro model configured successfully.")
        return model
    except Exception as e:
        logger.error(f"Error configuring Gemini: {e}. AI-assisted features may not work.")
        gemini_api_key_provided = False
        return None

def get_ai_schema_mapping_suggestions(source_name_a, columns_a, source_name_b, columns_b, target_schema_dict):
    """
    Uses Gemini to suggest schema mappings.
    target_schema_dict should be like: {'table_name': ['col1', 'col2', ...]}
    """
    global gemini_model
    if not gemini_model:
        logger.warning("Gemini model not configured. Skipping AI schema mapping.")
        return None

    prompt_parts = [
        "You are a data engineering assistant specializing in schema mapping and reconciliation.",
        "Given column names from two different source dataframes and a target canonical database schema, ",
        "your task is to suggest the most likely mappings from each source's columns to the target schema's columns.",
        "Consider common naming conventions, abbreviations, and semantic similarities (e.g., 'cust_id' and 'customer_identifier' might both map to 'customer_id').",
        "\n--- Source A Details ---",
        f"Source A Name: {source_name_a}",
        f"Source A Columns: {', '.join(columns_a)}",
        "\n--- Source B Details ---",
        f"Source B Name: {source_name_b}",
        f"Source B Columns: {', '.join(columns_b)}",
        "\n--- Target Canonical Schema ---"
    ]
    for table, cols in target_schema_dict.items():
        prompt_parts.append(f"Target Table '{table}': {', '.join(cols)}")

    prompt_parts.append("\n--- Instructions ---")
    prompt_parts.append(f"1. For '{source_name_a}', provide mappings to any relevant target table and column.")
    prompt_parts.append(f"2. For '{source_name_b}', provide mappings to any relevant target table and column.")
    prompt_parts.append("3. If a source column does not seem to map to any target column, indicate with 'NO_CLEAR_TARGET'.")
    prompt_parts.append("4. If a source column could map to multiple targets, list the most probable or note the ambiguity.")
    prompt_parts.append("Provide the output strictly as a single JSON object. Do not include any text or markdown formatting before or after the JSON block.")
    prompt_parts.append("\nExample JSON output format:")
    prompt_parts.append("""
    {
      "source_a_mappings": {
        "client_ref": "Customers.customer_id",
        "purchase_dt": "Orders.order_date",
        "item_sku": "Products.product_id",
        "weird_legacy_col": "NO_CLEAR_TARGET"
      },
      "source_b_mappings": {
        "CustomerID": "Customers.customer_id",
        "orderTimestamp": "Orders.order_date",
        "productCode": "Products.product_id"
      }
    }
    """)
    
    prompt = "\n".join(prompt_parts)
    logger.info(f"Sending schema mapping prompt to Gemini (first 500 chars):\n{prompt[:500]}...")
    
    try:
        response = gemini_model.generate_content(prompt)
        # Attempt to parse the response as JSON
        try:
            # Gemini might wrap JSON in ```json ... ```, so we extract it
            match = re.search(r"```json\s*([\s\S]*?)\s*```", response.text)
            if match:
                json_str = match.group(1)
            else:
                json_str = response.text # Assume it's raw JSON
            
            suggestions = json.loads(json_str)
            logger.info("Successfully received and parsed schema mapping suggestions from Gemini.")
            return suggestions
        except json.JSONDecodeError:
            logger.error(f"Gemini response was not valid JSON. Raw response:\n{response.text}")
            return {"error": "Invalid JSON response", "raw_text": response.text}
        except Exception as e_parse:
            logger.error(f"Error processing Gemini response: {e_parse}. Raw response:\n{response.text}")
            return {"error": str(e_parse), "raw_text": response.text}

    except Exception as e:
        logger.error(f"Error calling Gemini API for schema mapping: {e}")
        # Note: The original '429 Rate Limit Exceeded' error would be caught here.
        # This is an API usage issue, not a code bug. It means the free tier limit was reached.
        return None

# --- Define Target Schema for AI Mapping (simplified for demonstration) ---
target_schema_for_ai = {
    "Customers": ['customer_id', 'customer_name', 'email', 'phone', 'address_street', 'address_city', 'address_state', 'address_postal_code', 'registration_date', 'status', 'segment'],
    "Products": ['product_id', 'product_name', 'description', 'category', 'brand', 'price', 'cost', 'stock_quantity'],
    "Orders": ['order_id', 'customer_id', 'order_date', 'order_status', 'payment_method', 'shipping_cost_total', 'tax_total', 'discount_total', 'order_total_value_net'],
    "OrderItems": ['order_item_id', 'order_id', 'product_id', 'quantity', 'unit_price', 'line_item_total_value']
}

# --- Attempt to Configure Gemini ---
# This will prompt for API key if not set in environment
# NOTE: This relies on the first cell in the notebook (which sets up logging) having been run.
configure_gemini() 

# --- Execute AI Schema Mapping Suggestion (if API key was provided) ---
if gemini_api_key_provided and gemini_model:
    logger.info("Attempting AI-assisted schema mapping...")
    
    cust_raw_cols = df_customers_raw.columns.tolist() if 'df_customers_raw' in globals() and not df_customers_raw.empty else []
    prod_raw_cols = df_products_raw.columns.tolist() if 'df_products_raw' in globals() and not df_products_raw.empty else []
    
    if cust_raw_cols and prod_raw_cols:
        ai_schema_suggestions = get_ai_schema_mapping_suggestions(
            source_name_a="Customers Messy JSON",
            columns_a=cust_raw_cols,
            source_name_b="Products Inconsistent JSON",
            columns_b=prod_raw_cols,
            target_schema_dict=target_schema_for_ai
        )

        if ai_schema_suggestions:
            print("\n--- AI Schema Mapping Suggestions ---")
            print(json.dumps(ai_schema_suggestions, indent=2))
        else:
            logger.warning("AI schema mapping did not return suggestions (this may be due to an API error like rate limiting).")
    else:
        logger.warning("Raw customer or product dataframes are not available for AI schema mapping.")
else:
    logger.info("Gemini AI features skipped as API key was not provided or model initialization failed.")

  from .autonotebook import tqdm as notebook_tqdm
2025-06-23 08:48:44,656 - INFO - GEMINI_API_KEY environment variable not found.


Enter your Gemini API Key (or press Enter to skip AI features):  ········


2025-06-23 08:48:46,674 - INFO - Gemini 1.5 Pro model configured successfully.
2025-06-23 08:48:46,677 - INFO - Attempting AI-assisted schema mapping...
2025-06-23 08:48:46,679 - INFO - Sending schema mapping prompt to Gemini (first 500 chars):
You are a data engineering assistant specializing in schema mapping and reconciliation.
Given column names from two different source dataframes and a target canonical database schema, 
your task is to suggest the most likely mappings from each source's columns to the target schema's columns.
Consider common naming conventions, abbreviations, and semantic similarities (e.g., 'cust_id' and 'customer_identifier' might both map to 'customer_id').

--- Source A Details ---
Source A Name: Customers Me...
2025-06-23 08:48:47,364 - ERROR - Error calling Gemini API for schema mapping: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-lim

# Phase 5: Conclusion & Summary

It started with Phase 1: Data Discovery & Analysis, where I explored four messy datasets to find out data quality issues, understand the meaning of each field, and identify relationships between them. These insights helped with the development of the ETL pipelines in Phase 2.

I built robust ETL pipelines for the Customers, Products, and OrderItems datasets, which came from two different sources (reconciliation_challenge_data.csv and orders_unstructured_data.csv). These pipelines included custom utility functions for cleaning, standardizing (e.g., names, dates, numbers, IDs), and transforming the data. Cleaned data was then stored in a normalized SQLite database (unified_ecommerce.db), with a well-defined schema that included primary keys, foreign keys, and indexes. I also created unified Orders and OrderItems tables from the combined sources.

In Phase 4: Advanced Challenge, I proposed and implemented a basic framework to integrate Google Gemini’s API. This showed how AI could be used to suggest schema mappings to reconcile mismatched fields across sources.



Key Challenges Faced and Solutions
- Multiple Inconsistent ID Fields
- Unstructured and Messy Text Fields
- Incorrect Financial Totals

This was a great opportunity to learn and apply core data engineering skills and get to work on a real world problem. The AI reconciliation part was very interesting.
Thankyou for the opportunity.