___
## 1.2 ETL Process Implementation

**Dataset:** Synthetic data designed to mimic the structure and scale of the target dataset with similar columns:  

| Column       | Description                                      |
|--------------|--------------------------------------------------|
| InvoiceNo    | Unique invoice identifier                        |
| StockCode    | Product code                                     |
| Description  | Product description                              |
| Quantity     | Number of items purchased                        |
| InvoiceDate  | Date of purchase                                 |
| UnitPrice    | Price per item                                   |
| CustomerID   | Unique customer identifier                        |
| Country      | Customer's country                               |

**Dataset Features:**  
- Row count: ~500–1000 (practicality)  
- Quantities: 1–50, Prices: 1–100  
- Dates span 2 years  
- 100 unique customers  
- 5–10 countries  
- Includes missing values, categorical columns, and outliers for Quantity and UnitPrice  
- Seeded for reproducibility  
___

In [11]:
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
from faker import Faker
import sqlite3
import logging

# ===== Logging Setup =====
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# ===== Reproducibility =====
np.random.seed(42)
random.seed(42)

# ===== Config =====
NUM_ROWS = 1000
NUM_CUSTOMERS = 100
START_DATE = datetime(2023, 8, 12)
END_DATE = datetime(2025, 8, 11)
COUNTRIES = ["United Kingdom", "Germany", "France", "Spain", "Netherlands", "Italy", "Norway", "Portugal"]
PRODUCTS = [
    ("E001", "Wireless Mouse", "Electronics"),
    ("E002", "Bluetooth Headphones", "Electronics"),
    ("E003", "Smartphone Charger", "Electronics"),
    ("C001", "Men's T-Shirt", "Clothing"),
    ("C002", "Women's Jeans", "Clothing"),
    ("C003", "Baseball Cap", "Clothing"),
    ("H001", "Ceramic Mug", "Home Goods"),
    ("H002", "Wall Clock", "Home Goods"),
    ("H003", "LED Desk Lamp", "Home Goods"),
    ("T001", "Stuffed Bear", "Toys"),
    ("T002", "Building Blocks Set", "Toys"),
    ("T003", "RC Car", "Toys"),
]

# ===== Helper Functions =====
def random_date(start_date, end_date):
    delta = end_date - start_date
    return start_date + timedelta(
        days=random.randint(0, delta.days),
        seconds=random.randint(0, 86400)
    )

def generate_base_data(num_rows):
    data = []
    for _ in range(num_rows):
        invoice_no = f"INV{random.randint(10000, 99999)}"
        stock_code, description, category = random.choice(PRODUCTS)
        quantity = random.randint(1, 50)
        invoice_date = random_date(START_DATE, END_DATE)
        unit_price = round(random.uniform(1, 100), 2)
        customer_id = f"CUST{random.randint(1, NUM_CUSTOMERS)}"
        country = random.choice(COUNTRIES)
        data.append([invoice_no, stock_code, description, category, quantity, invoice_date, unit_price, customer_id, country])
    columns = ["InvoiceNo", "StockCode", "Description", "Category", "Quantity", "InvoiceDate", "UnitPrice", "CustomerID", "Country"]
    return pd.DataFrame(data, columns=columns)

def inject_missing_values(df, desc_frac=0.02, country_frac=0.01):
    df.loc[df.sample(frac=desc_frac).index, "Description"] = np.nan
    df.loc[df.sample(frac=country_frac).index, "Country"] = np.nan
    return df

def inject_outliers(df, neg_qty_frac=0.01, zero_price_frac=0.01):
    df.loc[df.sample(frac=neg_qty_frac).index, "Quantity"] *= -1
    df.loc[df.sample(frac=zero_price_frac).index, "UnitPrice"] = 0
    return df

def save_dataset(df, filename):
    df.to_csv(filename, index=False)

def generate_customer_names(num_customers):
    fake = Faker()
    return {f"CUST{i}": fake.name() for i in range(1, num_customers + 1)}

def assign_customer_names(df, customer_names):
    df['CustomerName'] = df['CustomerID'].map(customer_names)
    return df

def clean_and_convert(df):
    df['Description'] = df['Description'].fillna('Unknown Product')
    df['Country'] = df['Country'].fillna('Unknown Country')
    df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
    return df

___
### 1.2.1 Extract
- Python (pandas & Faker) was used to generate the synthetic dataset as a DataFrame.  
- Missing values handled for `Description` and `Country`.  
- Data types corrected, e.g., `InvoiceDate` converted to datetime. 

### 1.2.2 Transform

**Transformations Applied:**  
- Added a new column: `TotalSales = Quantity * UnitPrice`  
- Filtered data for sales in the last year (assuming current date = 2025-08-12)  
- Handled outliers by removing rows where `Quantity <= 0` or `UnitPrice <= 0` 

### 1.2.3 Load

**Loading Process:**  
- Used `sqlite3` in Python to create a database  
- Loaded data into:

  * 1 Fact Table: `SalesFact`  
  * 3 Dimension Tables: `ProductDim`, `CustomerDim`, `TimeDim` 
___

In [12]:
# ===== ETL Process: Export Only .db, Transformed Data, Synthetic Data =====
def run_etl_export_only():
    try:
        logging.info("Starting ETL process...")

        # === Extract (Synthetic Data) ===
        df_synthetic = generate_base_data(NUM_ROWS)
        df_synthetic = inject_missing_values(df_synthetic)
        df_synthetic = inject_outliers(df_synthetic)
        customer_names = generate_customer_names(df_synthetic['CustomerID'].nunique())
        df_synthetic = assign_customer_names(df_synthetic, customer_names)

        # Handle missing values & convert data types
        df_synthetic['Description'] = df_synthetic['Description'].fillna('Unknown Product')
        df_synthetic['Country'] = df_synthetic['Country'].fillna('Unknown Country')
        df_synthetic['InvoiceDate'] = pd.to_datetime(df_synthetic['InvoiceDate'], errors='coerce')

        # Save synthetic data
        df_synthetic.to_csv("synthetic_retail_dataset.csv", index=False)
        logging.info("Synthetic dataset exported as 'synthetic_retail_dataset.csv'")
        logging.info(f"Rows after extraction: {len(df_synthetic)}")
        # === Transform (Cleaned and Filtered Data) ===
        df_transformed = clean_and_convert(df_synthetic.copy())
        df_transformed['TotalSales'] = df_transformed['Quantity'] * df_transformed['UnitPrice']
        df_transformed = df_transformed[df_transformed['Quantity'] > 0]
        df_transformed = df_transformed[df_transformed['UnitPrice'] > 0]

        # Filter last 12 months
        current_date = pd.Timestamp("2025-08-12")
        one_year_ago = current_date - pd.DateOffset(years=1)
        df_transformed = df_transformed[(df_transformed['InvoiceDate'] >= one_year_ago) & 
                                        (df_transformed['InvoiceDate'] <= current_date)]

        # Save transformed data
        df_transformed.to_csv("transformed_retail_dataset.csv", index=False)
        logging.info("Transformed dataset exported as 'transformed_retail_dataset.csv'")
        logging.info(f"Rows after transformation: {len(df_transformed)}")   
        
        # === Load ===
        logging.info("Loading data into SQLite database using external schema...")

        conn = sqlite3.connect("retail.db")
        cur = conn.cursor()

        # Read schema from external SQL file
        with open("Schema2.sql", "r") as f:
            schema_sql = f.read()

        cur.executescript(schema_sql)

        # Insert into dimension tables
        customer_dim = df_transformed[['CustomerID', 'CustomerName', 'Country']].drop_duplicates()
        customer_dim.to_sql('CustomerDim', conn, if_exists='append', index=False)

        product_dim = df_transformed[['StockCode', 'Description', 'Category']].drop_duplicates()
        product_dim.to_sql('ProductDim', conn, if_exists='append', index=False)

        # Merge keys for fact table
        cust_keys = pd.read_sql("SELECT CustomerKey, CustomerID FROM CustomerDim", conn)
        prod_keys = pd.read_sql("SELECT ProductKey, StockCode FROM ProductDim", conn)
        fact_df = df_transformed.merge(cust_keys, on='CustomerID').merge(prod_keys, on='StockCode')
        fact_df = fact_df[['InvoiceNo', 'InvoiceDate', 'Quantity', 'UnitPrice', 'CustomerKey', 'ProductKey']]
        fact_df.to_sql('SalesFact', conn, if_exists='append', index=False)

        conn.commit()
        conn.close()
        logging.info("Data loaded successfully into SQLite database.")


        logging.info("ETL process completed: only synthetic, transformed, and .db exported.")

    except Exception as e:
        logging.error(f"ETL process failed: {e}")

# ===== Run ETL =====
if __name__ == "__main__":
    run_etl_export_only()

2025-08-14 23:46:38,674 - INFO - Starting ETL process...
2025-08-14 23:46:38,730 - INFO - Synthetic dataset exported as 'synthetic_retail_dataset.csv'
2025-08-14 23:46:38,731 - INFO - Rows after extraction: 1000
2025-08-14 23:46:38,748 - INFO - Transformed dataset exported as 'transformed_retail_dataset.csv'
2025-08-14 23:46:38,750 - INFO - Rows after transformation: 493
2025-08-14 23:46:38,751 - INFO - Loading data into SQLite database using external schema...
2025-08-14 23:46:38,994 - INFO - Data loaded successfully into SQLite database.
2025-08-14 23:46:38,996 - INFO - ETL process completed: only synthetic, transformed, and .db exported.


___
### 1.2.4 Full ETL Function

**Overview:**  
- Modular ETL function that can be applied to any dataset  
- Performs **full ETL** by calling `run_etl_export_only()`  
- Logs the number of rows processed at each stage  
- Handles errors gracefully  
- Exports:

  **Synthetic dataset** 
  
  **Transformed dataset**
  
  **SQLite database** 
___