---
## 1. Setup

In [24]:
import sqlite3
import pandas as pd
import sys
import os

from datetime import datetime

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 50)

OLTP_DB = 'OLTP/ecommerce-OLTP.db'
OLAP_DB = 'OLAP/ecommerce-OLAP.db'
ETL_DATE_STRING = '2026-02-02'
ETL_DATE_DATE = datetime.strptime(ETL_DATE_STRING, '%Y-%m-%d').date()

NUM_USERS = 100
NUM_PRODUCTS = 25
NUM_INVALID_RECORDS = 30
NUM_TRANSACTIONS = 250

---
## 2. Create Fresh Databases

In [15]:
for db_path in [OLTP_DB, OLAP_DB]:
    if os.path.exists(db_path):
        os.remove(db_path)

from OLTP.scripts.create_db import main as create_oltp_db
from OLAP.scripts.create_db import main as create_olap_db
from ETL.scripts.create_db import main as create_etl_db

create_oltp_db()
create_olap_db()
create_etl_db()

print("Database schemas created successfully")

Database schemas created successfully


---
## 3. Generate Valid OLTP Data

In [16]:
from OLTP.scripts.populate import main as populate_data
from OLTP.scripts.populate import create_new_transactions

populate_data(num_users=NUM_USERS, num_products=NUM_PRODUCTS)

conn_oltp = sqlite3.connect(OLTP_DB)

create_new_transactions(conn_oltp, NUM_TRANSACTIONS, ETL_DATE_DATE, status_weights=[0.85, 0.15])


data_summary = pd.DataFrame({
    'Table': ['users', 'products', 'transactions'],
    'Count': [
        pd.read_sql_query('SELECT COUNT(*) as count FROM users', conn_oltp).iloc[0]['count'],
        pd.read_sql_query('SELECT COUNT(*) as count FROM products', conn_oltp).iloc[0]['count'], 
        pd.read_sql_query('SELECT COUNT(*) as count FROM transactions', conn_oltp).iloc[0]['count']
    ]
})
conn_oltp.close()

print("OLTP Data Summary:")
display(data_summary)

OLTP database populated successfully
OLTP Data Summary:


Unnamed: 0,Table,Count
0,users,100
1,products,25
2,transactions,240


In [17]:
conn_oltp = sqlite3.connect(OLTP_DB)

before_counts = pd.DataFrame({
    'Table': ['users', 'products', 'transactions'],
    'Count': [
        pd.read_sql_query('SELECT COUNT(*) as count FROM users', conn_oltp).iloc[0]['count'],
        pd.read_sql_query('SELECT COUNT(*) as count FROM products', conn_oltp).iloc[0]['count'],
        pd.read_sql_query('SELECT COUNT(*) as count FROM transactions', conn_oltp).iloc[0]['count']
    ]
})

conn_oltp.close()

print("OLTP Data Before Error Injection:")
display(before_counts)

OLTP Data Before Error Injection:


Unnamed: 0,Table,Count
0,users,100
1,products,25
2,transactions,240


In [18]:
print("Dirty OLTP Data Preview:")

conn_oltp = sqlite3.connect(OLTP_DB)

invalid_users = pd.read_sql_query('''
    SELECT user_id, name, email FROM users 
    WHERE name = '' OR email NOT LIKE '%@%.%'
    LIMIT 5
''', conn_oltp)

print()
print("Invalid Users Sample:")
display(invalid_users if not invalid_users.empty else pd.DataFrame({'Message': ['No invalid users found']}))

invalid_products = pd.read_sql_query('''
    SELECT product_id, name, price, stock FROM products 
    WHERE stock < 0 OR price >= 10000
    LIMIT 5
''', conn_oltp)

print()
print("Invalid Products Sample:")
display(invalid_products if not invalid_products.empty else pd.DataFrame({'Message': ['No invalid products found']}))

conn_oltp.close()

Dirty OLTP Data Preview:

Invalid Users Sample:


Unnamed: 0,Message
0,No invalid users found



Invalid Products Sample:


Unnamed: 0,Message
0,No invalid products found


---
## 4. Inject Data Quality Issues

Injecting realistic errors that occur in production systems:
- Orphan transactions (referencing non-existent users/products)
- Invalid emails, empty names
- Negative quantities and stock
- Prices exceeding limits
- Invalid payment types and statuses
- Duplicate transaction IDs

In [19]:
from OLTP.scripts.generate_invalid import main as generate_invalid_data
generate_invalid_data(count=NUM_INVALID_RECORDS, today=ETL_DATE_STRING)

conn_oltp = sqlite3.connect(OLTP_DB)
after_counts = pd.DataFrame({
    'Table': ['users', 'products', 'transactions'],
    'Count': [
        pd.read_sql_query('SELECT COUNT(*) as count FROM users', conn_oltp).iloc[0]['count'],
        pd.read_sql_query('SELECT COUNT(*) as count FROM products', conn_oltp).iloc[0]['count'],
        pd.read_sql_query('SELECT COUNT(*) as count FROM transactions', conn_oltp).iloc[0]['count']
    ]
})
conn_oltp.close()

print("OLTP Data After Error Injection:")
display(after_counts)

Generating 30 invalid records for date: 2026-02-02
Querying OLTP database for valid reference IDs...
  Found 100 users: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] ...
  Found 25 products: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] ...

Inserting invalid records into OLTP database...
  Successfully inserted 30 records into database
OLTP Data After Error Injection:


Unnamed: 0,Table,Count
0,users,103
1,products,30
2,transactions,262


In [20]:
conn_oltp = sqlite3.connect(OLTP_DB)

print("Invalid Users:")
invalid_users = pd.read_sql_query('''
    SELECT * FROM users 
    WHERE name = '' OR email NOT LIKE '%@%.%' OR join_date IS NULL
    LIMIT 10
''', conn_oltp)
display(invalid_users)

print()
print("Invalid Products:")
invalid_products = pd.read_sql_query('''
    SELECT * FROM products 
    WHERE stock < 0 OR price >= 10000
    LIMIT 10
''', conn_oltp)
display(invalid_products)

print()
print("Invalid Transactions:")
invalid_tx = pd.read_sql_query('''
    SELECT * FROM transactions 
    WHERE quantity <= 0 
       OR status NOT IN ('success', 'failed')
       OR payment_type NOT IN ('visa', 'mastercard', 'wire transfer', 'other')
    LIMIT 10
''', conn_oltp)
display(invalid_tx)

conn_oltp.close()

Invalid Users:


Unnamed: 0,user_id,name,email,join_date
0,101,User 87,user101@example.com,
1,102,User 14,user102@example.com,
2,103,,user103@example.com,2025-01-15



Invalid Products:


Unnamed: 0,product_id,name,category,price,stock
0,26,Luxury Item 67,Accessories,49582.21,2
1,27,Luxury Item 97,Accessories,25619.73,9
2,28,Luxury Item 64,Accessories,41680.93,1
3,29,Luxury Item 7,Accessories,48954.82,8
4,30,Product 43,Accessories,90.4,-15



Invalid Transactions:


Unnamed: 0,transaction_id,date,user_id,product_id,quantity,price,payment_type,status
0,2400,2026-02-02,89,19,-5,10.89,mastercard,success
1,3536,2026-02-02,79,20,3,59.22,wire transfer,refunded
2,6961,2026-02-02,55,22,3,56.36,crypto,failed
3,5592,2026-02-02,77,5,0,52.38,visa,failed
4,8842,2026-02-02,85,9,1,93.19,other,processing
5,9983,2026-02-02,7,10,4,56.52,check,failed
6,9679,2026-02-02,49,1,3,96.6,wire transfer,pending
7,8717,2026-02-02,74,18,2,83.62,wire transfer,pending
8,4588,2026-02-02,68,12,5,70.23,PayPal,failed
9,2001,2026-02-02,58,14,-4,17.31,visa,success


### Preview of Dirty Data

In [21]:
from ETL.etl import main as run_etl
run_etl(ETL_DATE_STRING)

print("ETL pipeline completed successfully")

Starting ETL process for date: 2026-02-02
  Indexes created/verified


Fetching data from OLTP...
  Fetched 103 users, 30 products, 259 transactions

Validating data quality...
  Users: 100 valid, 3 rejected
  Products: 25 valid, 5 rejected
  Transactions: 245 valid, 14 rejected

  Users: 100 inserted, 0 updated, 0 unchanged

  Products: 25 inserted, 0 updated, 0 unchanged

  Stock history: 25 inserted, 0 skipped

  Transactions: 245 inserted, 0 skipped

ETL Summary:
  dim_user total rows: 100
  dim_product total rows: 25
  dim_date total rows: 1
  fact_transactions total rows: 245
  fact_stock_history total rows: 25
  Current users: 100
  Current products: 25

  Data Quality Summary:
    Errors: 22
ETL completed successfully for 2026-02-02
ETL pipeline completed successfully


---
## 5. Run ETL Pipeline

The ETL validates all records, rejects invalid ones with detailed logging, and loads only clean data into OLAP.

In [22]:
conn_olap = sqlite3.connect(OLAP_DB)

run_log = pd.read_sql_query('''
    SELECT * FROM etl_run_log 
    ORDER BY run_id DESC LIMIT 1
''', conn_olap)

print("ETL Run Summary:")
display(run_log)

ETL Run Summary:


Unnamed: 0,run_id,run_date,source_date,status,started_at,ended_at,duration_ms,rows_dim_user_inserted,rows_dim_product_inserted,rows_fact_transactions_inserted,rows_fact_stock_history_inserted,errors,warnings,notes
0,1,2026-02-04,2026-02-02,success,2026-02-04T17:37:23.341848,2026-02-04T17:37:24.137602,796,100,25,245,25,0,0,


---
## 6. ETL Run Metrics

In [23]:
latest_run = run_log.iloc[0]['run_id']

error_summary = pd.read_sql_query('''
    SELECT error_type, severity, COUNT(*) as count
    FROM etl_error_log 
    WHERE run_id = ?
    GROUP BY error_type, severity
    ORDER BY count DESC
''', conn_olap, params=[latest_run])

print("Data Quality Issues by Type:")
display(error_summary)

Data Quality Issues by Type:


Unnamed: 0,error_type,severity,count


---
## 7. Data Quality Error Analysis

In [None]:
sample_errors = pd.read_sql_query('''
    SELECT entity, record_id, error_type, severity, message
    FROM etl_error_log 
    WHERE run_id = ?
    ORDER BY error_type
    LIMIT 15
''', conn_olap, params=[latest_run])

print("Sample Rejected Records:")
display(sample_errors)

In [None]:
print("\nSample Rejected Records:")
sample_errors = pd.read_sql_query('''
    SELECT entity, record_id, error_type, severity, message
    FROM etl_error_log 
    WHERE run_id = ?
    ORDER BY error_type
    LIMIT 15
''', conn_olap, params=[latest_run])
display(sample_errors)

---
## 8. Verify OLAP Data is Clean

The OLAP tables should contain only valid, clean data that passed all validation checks.

In [None]:
table_counts = pd.DataFrame({
    'Table': ['dim_user', 'dim_product', 'dim_date', 'fact_transactions', 'fact_stock_history'],
    'Row Count': [
        pd.read_sql_query('SELECT COUNT(*) as count FROM dim_user', conn_olap).iloc[0]['count'],
        pd.read_sql_query('SELECT COUNT(*) as count FROM dim_product', conn_olap).iloc[0]['count'],
        pd.read_sql_query('SELECT COUNT(*) as count FROM dim_date', conn_olap).iloc[0]['count'],
        pd.read_sql_query('SELECT COUNT(*) as count FROM fact_transactions', conn_olap).iloc[0]['count'],
        pd.read_sql_query('SELECT COUNT(*) as count FROM fact_stock_history', conn_olap).iloc[0]['count']
    ]
})

print("OLAP Table Summary:")
display(table_counts)

In [None]:
users = pd.read_sql_query('''
    SELECT user_sk, user_id, name, email, join_date, current_flag
    FROM dim_user WHERE current_flag = 1 LIMIT 10
''', conn_olap)

print("dim_user (current versions):")
display(users)

In [None]:
products = pd.read_sql_query('''
    SELECT product_sk, product_id, name, category, price, current_flag
    FROM dim_product WHERE current_flag = 1 LIMIT 10
''', conn_olap)

print("dim_product (current versions):")
display(products)

In [None]:
transactions = pd.read_sql_query('''
    SELECT transaction_id, user_sk, product_sk, date_id, quantity, total, payment_type, status
    FROM fact_transactions LIMIT 10
''', conn_olap)

print("fact_transactions (sample):")
display(transactions)

---
## 9. Data Quality Verification

Prove that OLAP contains no invalid data:

In [None]:
checks = [
    ("Users with empty names", "SELECT COUNT(*) FROM dim_user WHERE name = '' OR name IS NULL"),
    ("Users with invalid emails", "SELECT COUNT(*) FROM dim_user WHERE email NOT LIKE '%@%.%'"),
    ("Products with negative prices", "SELECT COUNT(*) FROM dim_product WHERE price < 0"),
    ("Products with price >= 10000", "SELECT COUNT(*) FROM dim_product WHERE price >= 10000"),
    ("Transactions with qty <= 0", "SELECT COUNT(*) FROM fact_transactions WHERE quantity <= 0"),
    ("Transactions with invalid status", "SELECT COUNT(*) FROM fact_transactions WHERE status NOT IN ('success', 'failed')"),
    ("Transactions with invalid payment", "SELECT COUNT(*) FROM fact_transactions WHERE payment_type NOT IN ('visa', 'mastercard', 'wire transfer', 'other')"),
    ("Orphan transactions (no user)", "SELECT COUNT(*) FROM fact_transactions ft LEFT JOIN dim_user du ON ft.user_sk = du.user_sk WHERE du.user_sk IS NULL"),
    ("Orphan transactions (no product)", "SELECT COUNT(*) FROM fact_transactions ft LEFT JOIN dim_product dp ON ft.product_sk = dp.product_sk WHERE dp.product_sk IS NULL"),
]

verification_results = []
all_passed = True

for check_name, query in checks:
    count = conn_olap.execute(query).fetchone()[0]
    status = "PASS" if count == 0 else "FAIL"
    if count > 0:
        all_passed = False
    verification_results.append({
        'Check': check_name,
        'Count': count,
        'Status': status
    })

verification_df = pd.DataFrame(verification_results)

print("Data Quality Verification Results:")
display(verification_df)

print()
if all_passed:
    print("✅ ALL CHECKS PASSED - OLAP data is clean!")
else:
    print("❌ Some checks failed - Review validation logic")

In [None]:
conn_olap.close()
print("Demo completed successfully")