In [1]:
# Import required libraries
import pandas as pd
import numpy as np
import sqlite3
from datetime import datetime, timedelta
import random
import json
from IPython.display import display, HTML

# **Generate Sample Products Data**

In [4]:
def generate_products(n=100):
    """Generate sample product catalog"""

    categories = ['Electronics', 'Clothing', 'Food', 'Home', 'Sports',
                  'Beauty', 'Toys', 'Books', 'Automotive', 'Garden']

    products = []
    for i in range(1, n+1):
        product = {
            'product_id': f'PROD_{i:04d}',
            'product_name': f'{random.choice(["Premium", "Classic", "Deluxe", "Standard"])} '
                           f'{random.choice(categories)} Item {i}',
            'category': random.choice(categories),
            'unit_price': round(random.uniform(5, 500), 2),
            'max_stock_capacity': random.randint(50, 500)
        }
        products.append(product)

    df = pd.DataFrame(products)
    df.to_csv('products.csv', index=False)

    display(df.head(10))

    print("Statistics of the data:")
    print(f"  - Categories: {df['category'].nunique()}")
    print(f"  - Price Range: ${df['unit_price'].min():.2f} - ${df['unit_price'].max():.2f}")
    print(f"  - Avg Max Capacity: {df['max_stock_capacity'].mean():.0f} units")

    return df

products_df = generate_products(100)

Unnamed: 0,product_id,product_name,category,unit_price,max_stock_capacity
0,PROD_0001,Classic Toys Item 1,Garden,426.82,296
1,PROD_0002,Standard Home Item 2,Garden,320.42,243
2,PROD_0003,Deluxe Beauty Item 3,Electronics,440.77,224
3,PROD_0004,Classic Food Item 4,Automotive,325.36,85
4,PROD_0005,Classic Automotive Item 5,Beauty,257.04,362
5,PROD_0006,Deluxe Books Item 6,Toys,127.63,377
6,PROD_0007,Premium Toys Item 7,Food,408.5,424
7,PROD_0008,Classic Books Item 8,Clothing,140.06,149
8,PROD_0009,Standard Automotive Item 9,Clothing,87.9,398
9,PROD_0010,Standard Garden Item 10,Clothing,73.13,339


Statistics of the data:
  - Categories: 10
  - Price Range: $5.74 - $498.79
  - Avg Max Capacity: 276 units


# **Generate Sample Stores Data**

In [6]:
def generate_stores(n=10):

    cities_states = [
        ('New York', 'NY'), ('Los Angeles', 'CA'), ('Chicago', 'IL'),
        ('Houston', 'TX'), ('Phoenix', 'AZ'), ('Philadelphia', 'PA'),
        ('San Antonio', 'TX'), ('San Diego', 'CA'), ('Dallas', 'TX'),
        ('San Jose', 'CA'), ('Austin', 'TX'), ('Miami', 'FL')
    ]

    stores = []
    for i in range(1, n+1):
        city, state = random.choice(cities_states)
        store = {
            'store_id': f'STORE_{i:03d}',
            'store_name': f'{city} Store {i}',
            'city': city,
            'state': state
        }
        stores.append(store)

    df = pd.DataFrame(stores)
    df.to_csv('stores.csv', index=False)

    display(df.head(10))

    print("Statistics of the data:")
    print(f"  - Cities: {df['city'].nunique()}")
    print(f"  - States: {df['state'].nunique()}")

    return df

stores_df = generate_stores(10)

Unnamed: 0,store_id,store_name,city,state
0,STORE_001,Dallas Store 1,Dallas,TX
1,STORE_002,New York Store 2,New York,NY
2,STORE_003,Phoenix Store 3,Phoenix,AZ
3,STORE_004,Austin Store 4,Austin,TX
4,STORE_005,San Antonio Store 5,San Antonio,TX
5,STORE_006,Miami Store 6,Miami,FL
6,STORE_007,Chicago Store 7,Chicago,IL
7,STORE_008,Chicago Store 8,Chicago,IL
8,STORE_009,Miami Store 9,Miami,FL
9,STORE_010,Miami Store 10,Miami,FL


Statistics of the data:
  - Cities: 7
  - States: 5


# **Generate Inventory Snapshot with Data Quality Issues**

In [10]:
# CELL 4: Generate Inventory Snapshot (with intentional data quality issues)
def generate_inventory_snapshot(products_df, stores_df, date=None):

    if date is None:
        date = datetime.now().strftime('%Y-%m-%d')

    print(f"\n Snapshot Date: {date}")

    snapshots = []

    # Each store has 60-80% of products
    for _, store in stores_df.iterrows():
        store_products = products_df.sample(frac=random.uniform(0.6, 0.8))

        for _, product in store_products.iterrows():
            snapshot = {
                'snapshot_date': date,
                'store_id': store['store_id'],
                'product_id': product['product_id'],
                'quantity': random.randint(0, 200)
            }
            snapshots.append(snapshot)

    df = pd.DataFrame(snapshots)
    initial_count = len(df)

    print(f"\n Generated {initial_count} base inventory records")

    # Introduce data quality issues
    print(f"\n Introducing Data Quality Issues:")

    n_issues = int(len(df) * 0.1)

    # 1. Negative stock (2%)
    neg_count = int(n_issues * 0.2)
    neg_indices = df.sample(n=neg_count).index
    df.loc[neg_indices, 'quantity'] = -random.randint(1, 50)
    print(f"  - Negative stock: {neg_count} records")

    # 2. Mismatched product_id (3%)
    mismatch_count = int(n_issues * 0.3)
    mismatch_indices = df.sample(n=mismatch_count).index
    df.loc[mismatch_indices, 'product_id'] = ['PROD_9999', 'INVALID_001', 'MISSING_PROD'][random.randint(0, 2)]
    print(f"  - Invalid product IDs: {mismatch_count} records")

    # 3. Duplicate entries (5%)
    dup_count = int(n_issues * 0.5)
    duplicate_samples = df.sample(n=dup_count)
    df = pd.concat([df, duplicate_samples], ignore_index=True)
    print(f"  - Duplicate entries: {dup_count} records")

    # Shuffle
    df = df.sample(frac=1).reset_index(drop=True)

    filename = f'inventory_snapshot_{date}.csv'
    df.to_csv(filename, index=False)

    print(f"\n Total records (with issues): {len(df)}")
    print(f" Saved to: {filename}")
    print(f"\n Sample Data:")
    display(df.head(15))

    return df

# Execute
today = datetime.now().strftime('%Y-%m-%d')
inventory_df = generate_inventory_snapshot(products_df, stores_df, today)


 Snapshot Date: 2025-12-04

 Generated 667 base inventory records

 Introducing Data Quality Issues:
  - Negative stock: 13 records
  - Invalid product IDs: 19 records
  - Duplicate entries: 33 records

 Total records (with issues): 700
 Saved to: inventory_snapshot_2025-12-04.csv

 Sample Data:


Unnamed: 0,snapshot_date,store_id,product_id,quantity
0,2025-12-04,STORE_005,PROD_0007,180
1,2025-12-04,STORE_006,PROD_0073,79
2,2025-12-04,STORE_003,PROD_0058,82
3,2025-12-04,STORE_003,PROD_0011,104
4,2025-12-04,STORE_008,PROD_0030,91
5,2025-12-04,STORE_007,PROD_0061,0
6,2025-12-04,STORE_001,PROD_0092,38
7,2025-12-04,STORE_008,PROD_0018,27
8,2025-12-04,STORE_003,PROD_0024,190
9,2025-12-04,STORE_003,PROD_0086,172


# **Generate Restock Events with Data Quality Issues**

In [12]:
# CELL 5: Generate Restock Events (with intentional data quality issues)
def generate_restock_events(products_df, stores_df, date=None):

    if date is None:
        date = datetime.now().strftime('%Y-%m-%d')

    print(f"\n Restock Date: {date}")

    restocks = []

    # Generate 50-100 restock events
    n_events = random.randint(50, 100)

    for i in range(n_events):
        store = stores_df.sample(n=1).iloc[0]
        product = products_df.sample(n=1).iloc[0]

        restock = {
            'restock_date': date,
            'store_id': store['store_id'],
            'product_id': product['product_id'],
            'restock_quantity': random.randint(10, 100),
            'damaged_quantity': random.randint(0, 5),
            'expired_quantity': random.randint(0, 3)
        }
        restocks.append(restock)

    df = pd.DataFrame(restocks)
    initial_count = len(df)

    print(f"\n Generated {initial_count} base restock events")

    # Introduce data quality issues
    print(f"\n Introducing Data Quality Issues:")

    n_issues = int(len(df) * 0.08)

    # 1. Restock quantity > logical max (5%)
    excess_count = int(n_issues * 0.6)
    excess_indices = df.sample(n=excess_count).index
    df.loc[excess_indices, 'restock_quantity'] = random.randint(600, 1000)
    print(f"  - Excessive restock quantities: {excess_count} records")

    # 2. Invalid product_id (3%)
    invalid_count = int(n_issues * 0.4)
    invalid_indices = df.sample(n=invalid_count).index
    df.loc[invalid_indices, 'product_id'] = ['INVALID_PROD', 'WRONG_ID', 'PROD_XXXX'][random.randint(0, 2)]
    print(f"  - Invalid product IDs: {invalid_count} records")

    filename = f'restock_events_{date}.csv'
    df.to_csv(filename, index=False)

    print(f"\n Total records (with issues): {len(df)}")
    print(f" Saved to: {filename}")
    print(f"\n Sample Data:")
    display(df.head(15))

    return df

# Execute
restock_df = generate_restock_events(products_df, stores_df, today)


 Restock Date: 2025-12-04

 Generated 71 base restock events

 Introducing Data Quality Issues:
  - Excessive restock quantities: 3 records
  - Invalid product IDs: 2 records

 Total records (with issues): 71
 Saved to: restock_events_2025-12-04.csv

 Sample Data:


Unnamed: 0,restock_date,store_id,product_id,restock_quantity,damaged_quantity,expired_quantity
0,2025-12-04,STORE_010,PROD_0069,67,4,1
1,2025-12-04,STORE_003,PROD_0064,98,0,0
2,2025-12-04,STORE_005,PROD_0079,40,0,3
3,2025-12-04,STORE_007,PROD_0034,86,1,1
4,2025-12-04,STORE_005,WRONG_ID,944,1,1
5,2025-12-04,STORE_004,PROD_0045,77,3,0
6,2025-12-04,STORE_010,WRONG_ID,28,2,2
7,2025-12-04,STORE_006,PROD_0078,87,4,1
8,2025-12-04,STORE_010,PROD_0075,70,3,3
9,2025-12-04,STORE_005,PROD_0015,56,1,0


# **Create Database Schema**

In [33]:
# CELL 6: Create SQLite Database and Tables
def create_database():

    conn = sqlite3.connect('retail_inventory.db')
    cursor = conn.cursor()

    # Drop existing tables if they exist
    tables_to_drop = [
        'raw_inventory_snapshot',
        'raw_restock_events',
        'curated_inventory_fact',
        'quarantine_inventory',
        'products',
        'stores',
        'pipeline_logs'
    ]

    print("\n Dropping existing tables (if any)...")
    for table in tables_to_drop:
        cursor.execute(f'DROP TABLE IF EXISTS {table}')

    print("\n Creating tables...")

    # 1. Products table
    cursor.execute('''
        CREATE TABLE products (
            product_id TEXT PRIMARY KEY,
            product_name TEXT NOT NULL,
            category TEXT,
            unit_price REAL,
            max_stock_capacity INTEGER,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    print(" products")

    # 2. Stores table
    cursor.execute('''
        CREATE TABLE stores (
            store_id TEXT PRIMARY KEY,
            store_name TEXT NOT NULL,
            city TEXT,
            state TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    print(" stores")

    # 3. RAW Inventory Snapshot
    cursor.execute('''
        CREATE TABLE raw_inventory_snapshot (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            snapshot_date DATE NOT NULL,
            store_id TEXT NOT NULL,
            product_id TEXT NOT NULL,
            quantity INTEGER,
            ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    print(" raw_inventory_snapshot")

    # 4. RAW Restock Events
    cursor.execute('''
        CREATE TABLE raw_restock_events (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            restock_date DATE NOT NULL,
            store_id TEXT NOT NULL,
            product_id TEXT NOT NULL,
            restock_quantity INTEGER,
            damaged_quantity INTEGER DEFAULT 0,
            expired_quantity INTEGER DEFAULT 0,
            ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    print(" raw_restock_events")

    # 5. Curated Inventory Fact Table
    cursor.execute('''
        CREATE TABLE curated_inventory_fact (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            snapshot_date DATE NOT NULL,
            store_id TEXT NOT NULL,
            product_id TEXT NOT NULL,
            snapshot_quantity INTEGER NOT NULL,
            incoming_restock INTEGER DEFAULT 0,
            damaged_quantity INTEGER DEFAULT 0,
            expired_quantity INTEGER DEFAULT 0,
            effective_stock_level INTEGER NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (store_id) REFERENCES stores(store_id),
            FOREIGN KEY (product_id) REFERENCES products(product_id),
            UNIQUE(snapshot_date, store_id, product_id)
        )
    ''')
    print(" curated_inventory_fact")

    # 6. Quarantine Table
    cursor.execute('''
        CREATE TABLE quarantine_inventory (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            snapshot_date DATE,
            store_id TEXT,
            product_id TEXT,
            quantity INTEGER,
            restock_quantity INTEGER,
            error_type TEXT NOT NULL,
            error_description TEXT,
            raw_data TEXT,
            quarantined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    print(" quarantine_inventory")

    # 7. Pipeline Logs
    cursor.execute('''
        CREATE TABLE pipeline_logs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            pipeline_run_id TEXT NOT NULL,
            step_name TEXT NOT NULL,
            status TEXT NOT NULL,
            records_processed INTEGER DEFAULT 0,
            records_passed INTEGER DEFAULT 0,
            records_failed INTEGER DEFAULT 0,
            error_message TEXT,
            execution_time_seconds REAL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    print(" pipeline_logs")

    conn.commit()
    conn.close()

    print("\n Database created successfully: retail_inventory.db")

# Execute
create_database()


 Dropping existing tables (if any)...

 Creating tables...
 products
 stores
 raw_inventory_snapshot
 raw_restock_events
 curated_inventory_fact
 quarantine_inventory
 pipeline_logs

 Database created successfully: retail_inventory.db


# **Load Master Data into Database**

In [34]:
# CELL 7: Load Master Data (Products & Stores) into Database
def load_master_data():
    # """Load products and stores into database"""
    # print("=" * 60)
    # print("LOADING MASTER DATA INTO DATABASE")
    # print("=" * 60)

    conn = sqlite3.connect('retail_inventory.db')

    # Load Products
    # print("\n Loading products...")
    products_df = pd.read_csv('products.csv')
    products_df.to_sql('products', conn, if_exists='append', index=False)
    # print(f"  ✓ Loaded {len(products_df)} products")

    # Load Stores
    # print("\n Loading stores...")
    stores_df = pd.read_csv('stores.csv')
    stores_df.to_sql('stores', conn, if_exists='append', index=False)
    # print(f"  ✓ Loaded {len(stores_df)} stores")

    conn.close()

    # print("\n Master data loaded successfully!")
    # print("=" * 60)

# Execute
load_master_data()

# **Ingest CSV Files into RAW Layer**

In [36]:
# CELL 8: Ingest CSV Files into RAW Layer
def ingest_raw_data(snapshot_date):

    conn = sqlite3.connect('retail_inventory.db')

    # Ingest Inventory Snapshot
    # print(f"\n Ingesting inventory_snapshot_{snapshot_date}.csv...")
    snapshot_df = pd.read_csv(f'inventory_snapshot_{snapshot_date}.csv')
    snapshot_df.to_sql('raw_inventory_snapshot', conn, if_exists='append', index=False)
    # print(f"  ✓ Ingested {len(snapshot_df)} records into raw_inventory_snapshot")

    # Ingest Restock Events
    # print(f"\n Ingesting restock_events_{snapshot_date}.csv...")
    restock_df = pd.read_csv(f'restock_events_{snapshot_date}.csv')
    restock_df.to_sql('raw_restock_events', conn, if_exists='append', index=False)
    # print(f"  ✓ Ingested {len(restock_df)} records into raw_restock_events")

    conn.close()

    # print("\n Raw data ingestion completed!")
    # print("=" * 60)

    return len(snapshot_df), len(restock_df)

# Execute
snapshot_count, restock_count = ingest_raw_data(today)

# **Data Validation and Cleaning**

In [39]:
# CELL 9: Data Validation and Cleaning Logic
class InventoryValidator:
    """Validates and cleans inventory data"""

    def __init__(self, db_path='retail_inventory.db'):
        self.db_path = db_path
        self.validation_results = {
            'negative_stock': [],
            'invalid_product_id': [],
            'duplicate_entries': [],
            'excessive_restock': [],
            'valid_records': []
        }

    def validate_inventory_snapshot(self, snapshot_date):
        """Validate inventory snapshot data"""
        # print("=" * 60)
        # print("STEP 2: VALIDATING INVENTORY SNAPSHOT")
        # print("=" * 60)

        conn = sqlite3.connect(self.db_path)

        # Read raw data
        query = f"""
            SELECT * FROM raw_inventory_snapshot
            WHERE snapshot_date = '{snapshot_date}'
        """
        df = pd.read_sql_query(query, conn)

        # print(f"\n Total records to validate: {len(df)}")

        # Get valid product IDs
        valid_products = pd.read_sql_query("SELECT product_id FROM products", conn)
        valid_product_ids = set(valid_products['product_id'])

        # Get valid store IDs
        valid_stores = pd.read_sql_query("SELECT store_id FROM stores", conn)
        valid_store_ids = set(valid_stores['store_id'])

        quarantine_records = []

        # print("\n🔍 Running validation checks...")

        # Check 1: Negative stock
        negative_stock = df[df['quantity'] < 0]
        # print(f"    Negative stock: {len(negative_stock)} records")
        for _, row in negative_stock.iterrows():
            quarantine_records.append({
                'snapshot_date': row['snapshot_date'],
                'store_id': row['store_id'],
                'product_id': row['product_id'],
                'quantity': row['quantity'],
                'restock_quantity': None,
                'error_type': 'negative_stock',
                'error_description': f"Quantity is negative: {row['quantity']}",
                'raw_data': json.dumps(row.to_dict())
            })

        # Check 2: Invalid product_id
        invalid_products = df[~df['product_id'].isin(valid_product_ids)]
        # print(f"    Invalid product IDs: {len(invalid_products)} records")
        for _, row in invalid_products.iterrows():
            quarantine_records.append({
                'snapshot_date': row['snapshot_date'],
                'store_id': row['store_id'],
                'product_id': row['product_id'],
                'quantity': row['quantity'],
                'restock_quantity': None,
                'error_type': 'invalid_product_id',
                'error_description': f"Product ID not found in catalog: {row['product_id']}",
                'raw_data': json.dumps(row.to_dict())
            })

        # Check 3: Duplicate entries (keep first, quarantine rest)
        duplicates = df[df.duplicated(subset=['snapshot_date', 'store_id', 'product_id'], keep='first')]
        # print(f"    Duplicate entries: {len(duplicates)} records")
        for _, row in duplicates.iterrows():
            quarantine_records.append({
                'snapshot_date': row['snapshot_date'],
                'store_id': row['store_id'],
                'product_id': row['product_id'],
                'quantity': row['quantity'],
                'restock_quantity': None,
                'error_type': 'duplicate_entry',
                'error_description': "Duplicate snapshot record for same date/store/product",
                'raw_data': json.dumps(row.to_dict())
            })

        # Get IDs of invalid records
        invalid_ids = set(negative_stock.index) | set(invalid_products.index) | set(duplicates.index)

        # Valid records
        valid_df = df[~df.index.isin(invalid_ids)]
        # print(f"   Valid records: {len(valid_df)}")

        # Save quarantine records
        if quarantine_records:
            quarantine_df = pd.DataFrame(quarantine_records)
            quarantine_df.to_sql('quarantine_inventory', conn, if_exists='append', index=False)
            print(f"\n Quarantined {len(quarantine_records)} records")

        conn.close()

        # print("\n Inventory snapshot validation completed!")
        print("=" * 60)

        return valid_df, len(quarantine_records)

    def validate_restock_events(self, restock_date):
        """Validate restock events data"""
        print("=" * 60)
        print("STEP 3: VALIDATING RESTOCK EVENTS")
        print("=" * 60)

        conn = sqlite3.connect(self.db_path)

        # Read raw data
        query = f"""
            SELECT * FROM raw_restock_events
            WHERE restock_date = '{restock_date}'
        """
        df = pd.read_sql_query(query, conn)

        # print(f"\n Total records to validate: {len(df)}")

        # Get valid product IDs and their max capacity
        products_df = pd.read_sql_query("SELECT product_id, max_stock_capacity FROM products", conn)
        valid_product_ids = set(products_df['product_id'])
        product_capacity = dict(zip(products_df['product_id'], products_df['max_stock_capacity']))

        quarantine_records = []

        # print("\n Running validation checks...")

        # Check 1: Invalid product_id
        invalid_products = df[~df['product_id'].isin(valid_product_ids)]
        # print(f"    Invalid product IDs: {len(invalid_products)} records")
        for _, row in invalid_products.iterrows():
            quarantine_records.append({
                'snapshot_date': row['restock_date'],
                'store_id': row['store_id'],
                'product_id': row['product_id'],
                'quantity': None,
                'restock_quantity': row['restock_quantity'],
                'error_type': 'invalid_product_id',
                'error_description': f"Product ID not found in catalog: {row['product_id']}",
                'raw_data': json.dumps(row.to_dict())
            })

        # Check 2: Restock quantity > logical max
        excessive_restock = []
        for _, row in df.iterrows():
            if row['product_id'] in product_capacity:
                max_capacity = product_capacity[row['product_id']]
                if row['restock_quantity'] > max_capacity:
                    excessive_restock.append(row)
                    quarantine_records.append({
                        'snapshot_date': row['restock_date'],
                        'store_id': row['store_id'],
                        'product_id': row['product_id'],
                        'quantity': None,
                        'restock_quantity': row['restock_quantity'],
                        'error_type': 'excessive_restock',
                        'error_description': f"Restock quantity ({row['restock_quantity']}) exceeds max capacity ({max_capacity})",
                        'raw_data': json.dumps(row.to_dict())
                    })

        # print(f"    Excessive restock quantities: {len(excessive_restock)} records")

        # Get IDs of invalid records
        invalid_ids = set(invalid_products.index) | set(pd.DataFrame(excessive_restock).index if excessive_restock else [])

        # Valid records
        valid_df = df[~df.index.isin(invalid_ids)]
        # print(f"   Valid records: {len(valid_df)}")

        # Save quarantine records
        if quarantine_records:
            quarantine_df = pd.DataFrame(quarantine_records)
            quarantine_df.to_sql('quarantine_inventory', conn, if_exists='append', index=False)
            # print(f"\n Quarantined {len(quarantine_records)} records")

        conn.close()

        # print("\n Restock events validation completed!")
        # print("=" * 60)

        return valid_df, len(quarantine_records)

# Execute validation
validator = InventoryValidator()
valid_snapshot_df, snapshot_quarantine_count = validator.validate_inventory_snapshot(today)
valid_restock_df, restock_quarantine_count = validator.validate_restock_events(today)

# print(f"\n📊 VALIDATION SUMMARY:")
# print(f"  - Snapshot: {len(valid_snapshot_df)} valid, {snapshot_quarantine_count} quarantined")
# print(f"  - Restock: {len(valid_restock_df)} valid, {restock_quarantine_count} quarantined")


📋 Quarantined 808 records
STEP 3: VALIDATING RESTOCK EVENTS


# **Calculate Effective Stock and Load to Curated Table**

In [43]:
# CELL 10: Calculate Effective Stock Level and Load Curated Data
def create_curated_inventory(snapshot_date):
    """Calculate effective stock and create curated inventory fact table"""
    print("=" * 60)
    print("STEP 4: CREATING CURATED INVENTORY FACT TABLE")
    print("=" * 60)

    conn = sqlite3.connect('retail_inventory.db')

    # Get validated snapshot data
    snapshot_query = f"""
        SELECT
            ris.snapshot_date,
            ris.store_id,
            ris.product_id,
            ris.quantity as snapshot_quantity
        FROM raw_inventory_snapshot ris
        INNER JOIN products p ON ris.product_id = p.product_id
        INNER JOIN stores s ON ris.store_id = s.store_id
        WHERE ris.snapshot_date = '{snapshot_date}'
            AND ris.quantity >= 0
        GROUP BY ris.snapshot_date, ris.store_id, ris.product_id
        HAVING ris.id = MIN(ris.id)
    """

    snapshot_df = pd.read_sql_query(snapshot_query, conn)
    print(f"\n Valid snapshot records: {len(snapshot_df)}")

    # Get validated restock data (aggregate by store/product)
    restock_query = f"""
        SELECT
            rre.store_id,
            rre.product_id,
            SUM(rre.restock_quantity) as total_restock,
            SUM(rre.damaged_quantity) as total_damaged,
            SUM(rre.expired_quantity) as total_expired
        FROM raw_restock_events rre
        INNER JOIN products p ON rre.product_id = p.product_id
        WHERE rre.restock_date = '{snapshot_date}'
            AND rre.restock_quantity <= p.max_stock_capacity
        GROUP BY rre.store_id, rre.product_id
    """

    restock_df = pd.read_sql_query(restock_query, conn)
    print(f" Valid restock records: {len(restock_df)}")

    # Merge snapshot with restock data
    curated_df = snapshot_df.merge(
        restock_df,
        on=['store_id', 'product_id'],
        how='left'
    )

    # Fill NaN values with 0 for stores with no restock
    curated_df['total_restock'] = curated_df['total_restock'].fillna(0).astype(int)
    curated_df['total_damaged'] = curated_df['total_damaged'].fillna(0).astype(int)
    curated_df['total_expired'] = curated_df['total_expired'].fillna(0).astype(int)

    # Calculate effective stock level
    print("\n Calculating effective stock levels...")
    print("   Formula: effective_stock = snapshot + restock - damaged - expired")

    curated_df['effective_stock_level'] = (
        curated_df['snapshot_quantity'] +
        curated_df['total_restock'] -
        curated_df['total_damaged'] -
        curated_df['total_expired']
    )

    # Rename columns to match table schema
    curated_df = curated_df.rename(columns={
        'total_restock': 'incoming_restock',
        'total_damaged': 'damaged_quantity',
        'total_expired': 'expired_quantity'
    })

    # Load to curated table
    print(f"\n Loading {len(curated_df)} records to curated_inventory_fact...")
    curated_df.to_sql('curated_inventory_fact', conn, if_exists='append', index=False)

    # Display statistics
    print("\n CURATED DATA STATISTICS:")
    print(f"  - Total records: {len(curated_df)}")
    print(f"  - Average snapshot quantity: {curated_df['snapshot_quantity'].mean():.2f}")
    print(f"  - Average incoming restock: {curated_df['incoming_restock'].mean():.2f}")
    # print(f"  - Average effective stock: {curated_df['effective_stock_level'].mean():.2f

    display(curated_df.head(10))
    conn.close()
    return curated_df

In [46]:
# CELL 11: View Results and Generate Reports
def generate_pipeline_report():
    """Generate comprehensive pipeline execution report"""
    print("=" * 60)
    print("PIPELINE EXECUTION REPORT")
    print("=" * 60)

    conn = sqlite3.connect('retail_inventory.db')

    # 1. Curated Inventory Summary
    print("\n CURATED INVENTORY SUMMARY")
    print("-" * 60)
    curated_query = """
        SELECT
            COUNT(*) as total_records,
            COUNT(DISTINCT store_id) as total_stores,
            COUNT(DISTINCT product_id) as total_products,
            SUM(snapshot_quantity) as total_snapshot_stock,
            SUM(incoming_restock) as total_incoming,
            SUM(damaged_quantity) as total_damaged,
            SUM(expired_quantity) as total_expired,
            SUM(effective_stock_level) as total_effective_stock,
            AVG(effective_stock_level) as avg_stock_per_item
        FROM curated_inventory_fact
    """
    curated_summary = pd.read_sql_query(curated_query, conn)
    display(curated_summary)

    # 2. Quarantine Summary
    print("\n  QUARANTINE SUMMARY")
    print("-" * 60)
    quarantine_query = """
        SELECT
            error_type,
            COUNT(*) as count,
            ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM quarantine_inventory), 2) as percentage
        FROM quarantine_inventory
        GROUP BY error_type
        ORDER BY count DESC
    """
    quarantine_summary = pd.read_sql_query(quarantine_query, conn)
    display(quarantine_summary)

    # 3. Top 10 Stores by Inventory Value
    print("\n TOP 10 STORES BY INVENTORY")
    print("-" * 60)
    top_stores_query = """
        SELECT
            s.store_name,
            s.city,
            s.state,
            COUNT(cif.product_id) as product_count,
            SUM(cif.effective_stock_level) as total_inventory
        FROM curated_inventory_fact cif
        JOIN stores s ON cif.store_id = s.store_id
        GROUP BY s.store_id
        ORDER BY total_inventory DESC
        LIMIT 10
    """
    top_stores = pd.read_sql_query(top_stores_query, conn)
    display(top_stores)

    # 4. Products Needing Restock (Low Stock Alert)
    print("\n LOW STOCK ALERTS (Stock < 10)")
    print("-" * 60)
    low_stock_query = """
        SELECT
            p.product_name,
            p.category,
            s.store_name,
            cif.effective_stock_level
        FROM curated_inventory_fact cif
        JOIN products p ON cif.product_id = p.product_id
        JOIN stores s ON cif.store_id = s.store_id
        WHERE cif.effective_stock_level < 10 AND cif.effective_stock_level > 0
        ORDER BY cif.effective_stock_level ASC
        LIMIT 15
    """
    low_stock = pd.read_sql_query(low_stock_query, conn)
    display(low_stock)

    # 5. Category-wise Inventory
    print("\n INVENTORY BY CATEGORY")
    print("-" * 60)
    category_query = """
        SELECT
            p.category,
            COUNT(DISTINCT cif.product_id) as unique_products,
            SUM(cif.effective_stock_level) as total_stock,
            AVG(cif.effective_stock_level) as avg_stock_per_product
        FROM curated_inventory_fact cif
        JOIN products p ON cif.product_id = p.product_id
        GROUP BY p.category
        ORDER BY total_stock DESC
    """
    category_summary = pd.read_sql_query(category_query, conn)
    display(category_summary)

    conn.close()

    print("\n Report generation completed!")
    print("=" * 60)

# Execute
generate_pipeline_report()

PIPELINE EXECUTION REPORT

📊 CURATED INVENTORY SUMMARY
------------------------------------------------------------


Unnamed: 0,total_records,total_stores,total_products,total_snapshot_stock,total_incoming,total_damaged,total_expired,total_effective_stock,avg_stock_per_item
0,0,0,0,,,,,,



⚠️  QUARANTINE SUMMARY
------------------------------------------------------------


Unnamed: 0,error_type,count,percentage
0,duplicate_entry,2232,90.51
1,invalid_product_id,126,5.11
2,negative_stock,78,3.16
3,excessive_restock,30,1.22



🏪 TOP 10 STORES BY INVENTORY
------------------------------------------------------------


Unnamed: 0,store_name,city,state,product_count,total_inventory



🚨 LOW STOCK ALERTS (Stock < 10)
------------------------------------------------------------


Unnamed: 0,product_name,category,store_name,effective_stock_level



📦 INVENTORY BY CATEGORY
------------------------------------------------------------


Unnamed: 0,category,unique_products,total_stock,avg_stock_per_product



✅ Report generation completed!


In [47]:
# CELL 12: Examine Quarantine Records in Detail
def view_quarantine_details():
    """View detailed quarantine records for diagnostics"""
    print("=" * 60)
    print("QUARANTINE RECORDS DETAIL VIEW")
    print("=" * 60)

    conn = sqlite3.connect('retail_inventory.db')

    # Get all quarantine records
    query = """
        SELECT
            id,
            snapshot_date,
            store_id,
            product_id,
            quantity,
            restock_quantity,
            error_type,
            error_description,
            quarantined_at
        FROM quarantine_inventory
        ORDER BY quarantined_at DESC
    """

    quarantine_df = pd.read_sql_query(query, conn)

    print(f"\n Total Quarantined Records: {len(quarantine_df)}")

    if len(quarantine_df) > 0:
        print("\n Sample Quarantine Records:")
        display(quarantine_df.head(20))

        # Group by error type
        # print("\n Records by Error Type:")
        error_counts = quarantine_df['error_type'].value_counts()
        print(error_counts)
    else:
        print("\ No records in quarantine!")

    conn.close()

    # print("\n" + "=" * 60)

# Execute
view_quarantine_details()

QUARANTINE RECORDS DETAIL VIEW

📋 Total Quarantined Records: 2466

🔍 Sample Quarantine Records:


Unnamed: 0,id,snapshot_date,store_id,product_id,quantity,restock_quantity,error_type,error_description,quarantined_at
0,1645,2025-12-04,STORE_006,PROD_0042,-43.0,,negative_stock,Quantity is negative: -43,2025-12-04 13:20:19
1,1646,2025-12-04,STORE_006,PROD_0035,-43.0,,negative_stock,Quantity is negative: -43,2025-12-04 13:20:19
2,1647,2025-12-04,STORE_010,PROD_0093,-43.0,,negative_stock,Quantity is negative: -43,2025-12-04 13:20:19
3,1648,2025-12-04,STORE_003,PROD_0060,-43.0,,negative_stock,Quantity is negative: -43,2025-12-04 13:20:19
4,1649,2025-12-04,STORE_007,PROD_0077,-43.0,,negative_stock,Quantity is negative: -43,2025-12-04 13:20:19
5,1650,2025-12-04,STORE_002,PROD_0080,-43.0,,negative_stock,Quantity is negative: -43,2025-12-04 13:20:19
6,1651,2025-12-04,STORE_004,PROD_0100,-43.0,,negative_stock,Quantity is negative: -43,2025-12-04 13:20:19
7,1652,2025-12-04,STORE_001,PROD_0027,-43.0,,negative_stock,Quantity is negative: -43,2025-12-04 13:20:19
8,1653,2025-12-04,STORE_003,PROD_0067,-43.0,,negative_stock,Quantity is negative: -43,2025-12-04 13:20:19
9,1654,2025-12-04,STORE_010,PROD_0052,-43.0,,negative_stock,Quantity is negative: -43,2025-12-04 13:20:19



📊 Records by Error Type:
error_type
duplicate_entry       2232
invalid_product_id     126
negative_stock          78
excessive_restock       30
Name: count, dtype: int64



In [50]:
# CELL 13: Export Results to CSV files
def export_results():
    """Export curated data and reports to CSV files"""
    # print("=" * 60)
    print("EXPORTING RESULTS")
    # print("=" * 60)

    conn = sqlite3.connect('retail_inventory.db')

    # Export curated inventory
    # print("\n Exporting curated inventory...")
    curated_df = pd.read_sql_query("SELECT * FROM curated_inventory_fact", conn)
    curated_df.to_csv('curated_inventory_output.csv', index=False)
    # print(f"  ✓ Saved: curated_inventory_output.csv ({len(curated_df)} records)")

    # Export quarantine records
    # print("\n Exporting quarantine records...")
    quarantine_df = pd.read_sql_query("SELECT * FROM quarantine_inventory", conn)
    quarantine_df.to_csv('quarantine_records_output.csv', index=False)
    print(f"  ✓ Saved: quarantine_records_output.csv ({len(quarantine_df)} records)")

    conn.close()

    # print("\n Export completed!")
    # print("=" * 60)

    # Download files in Colab
    from google.colab import files

    # print("\n Downloading files...")
    files.download('curated_inventory_output.csv')
    files.download('quarantine_records_output.csv')

# Execute
export_results()

EXPORTING RESULTS
  ✓ Saved: quarantine_records_output.csv (2466 records)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>