## ETL RETAIL DATA
### 1. EXTRACTION

#### Imports and Setting Seeds

In [200]:
# ETL RETAIL DATA
# 1. EXTRACTION
# Imports and Setting Seeds
import pandas as pd       # For data manipulation and analysis
import numpy as np        # For numerical operations
from faker import Faker   # For generating realistic fake data
import random             # For randomization
from datetime import datetime
import sqlite3

# Fix the randomness so the output is the same every time the script runs
faker = Faker()
random.seed(42)

#### Configuration and Product Catalog


In [201]:
# Configuration and Product Catalog
NUM_RECORDS = 1000        # Total rows in dataset
START_YEAR = 2023         # Start of date range
END_YEAR = 2025           # End of date range

products = {
    "Electronics": [
        ("E101", "Bluetooth Speaker"),
        ("E102", "Smartphone Tripod"),
        ("E103", "Noise Cancelling Headphones")
    ],
    "Clothing": [
        ("C201", "Denim Jacket"),
        ("C202", "Sports Socks"),
        ("C203", "Wool Scarf")
    ],
    "Home Decor": [
        ("H301", "Ceramic Mug"),
        ("H302", "Decorative Cushion"),
        ("H303", "Floor Rug")
    ],
    "Toys": [
        ("T401", "Lego Set"),
        ("T402", "RC Car"),
        ("T403", "Stuffed Bear")
    ]
}

#### Flatten Product List
Purpose: Convert the nested products dictionary into a single flat list.

Before: Products are stored by category → each category has multiple (code, name) pairs.

After: A list where each entry is (ProductCode, ProductName, Category).

Why? Makes it easier to randomly select any product without first choosing a category.

In [202]:
product_list = [
    (code, name, cat)
    for cat, items in products.items()
    for code, name in items
]


#### Function to generate a random datetime between two years
A small function is made to give a random date between two years. This keeps dates realistic.

In [203]:
def random_date(start_year, end_year):
    return faker.date_time_between(
        start_date=datetime(start_year, 1, 1),   # Earliest date
        end_date=datetime(end_year, 8, 12)       # Latest date
    )

#### Generate Synthetic Data
We loop to create each record. Products, prices, and other details are chosen randomly. Some values are made wrong or missing on purpose to make the dataset messy.It Keeps data realistic but imperfect for transformation practice, mimicking real world messy data.

In [204]:
data = []
for _ in range(NUM_RECORDS):
    pid, pname, category = random.choice(product_list)  # Random product
    data.append({
        "InvoiceNo": faker.uuid4(),  # Unique invoice ID
        "ProductID": pid,
        "ProductName": pname if random.random() > 0.02 else None,  # 2% missing
        "Category": category,
        "Quantity": random.choice([random.randint(1, 20), None]),  # Missing some
        "UnitPrice": random.choice([round(random.uniform(5, 300), 2), -10.0]),  # Outlier
        "InvoiceDate": random_date(START_YEAR, END_YEAR),  # Random date
        "CustomerID": random.choice([random.randint(1, 100), None]),  # Missing IDs
        "Country": random.choice([faker.country(), ""])  # Empty country
    })

#### Create DataFrame

In [205]:
# Convert list of dictionaries into a pandas DataFrame
df = pd.DataFrame(data)
# Changed: Added logging for row count
print(f"Extracted {len(df)} rows to DataFrame")
# Save to CSV
df.to_csv("retail_data.csv", index=False)
print("Synthetic retail data generated and saved to 'retail_data.csv'.")


Extracted 1000 rows to DataFrame
Synthetic retail data generated and saved to 'retail_data.csv'.


### TRANSFORMATION 

#### Handle missing values

In [206]:
# Fill missing product names and empty countries
df['ProductName'] = df['ProductName'].fillna('Unknown Product')
df['Country'] = df['Country'].replace('', 'Unknown')
# Remove rows without CustomerID
df = df.dropna(subset=['CustomerID'])
# Changed: Added logging
print(f"After handling missing CustomerID: {len(df)} rows")

After handling missing CustomerID: 493 rows


#### Handle Outliers

In [207]:
# Remove rows with negative quantity or zero/negative prices
df = df[(df['Quantity'] >= 0) & (df['UnitPrice'] > 0)]
# Changed: Added logging
print(f"After removing outliers: {len(df)} rows")

After removing outliers: 116 rows


#### Calculate Total Sales

In [208]:
# Compute revenue per transaction
df['TotalSales'] = df['Quantity'] * df['UnitPrice']


#### Create Customer Salary

In [209]:
# Summarize total purchases per customer
customer_summary = df.groupby('CustomerID').agg({
    'TotalSales': 'sum',
    'Country': 'first'  # Use 'first' to assign one country per customer
}).reset_index()
# Changed: Added logging
print(f"Customer summary created: {len(customer_summary)} rows")

Customer summary created: 64 rows


#### Filter Last Year Sales

In [210]:
# Keep only sales from Aug 12, 2024 onwards
cutoff_date = datetime(2024, 8, 12)
df_last_year = df[df['InvoiceDate'] >= cutoff_date]
# Changed: Added logging
print(f"Filtered to last year: {len(df_last_year)} rows")


Filtered to last year: 47 rows


#### Check Results

In [211]:
df_last_year.info()           # Info about filtered dataset
customer_summary.head()       # First few rows of customer summary


<class 'pandas.core.frame.DataFrame'>
Index: 47 entries, 0 to 955
Data columns (total 10 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   InvoiceNo    47 non-null     object        
 1   ProductID    47 non-null     object        
 2   ProductName  47 non-null     object        
 3   Category     47 non-null     object        
 4   Quantity     47 non-null     float64       
 5   UnitPrice    47 non-null     float64       
 6   InvoiceDate  47 non-null     datetime64[ns]
 7   CustomerID   47 non-null     float64       
 8   Country      47 non-null     object        
 9   TotalSales   47 non-null     float64       
dtypes: datetime64[ns](1), float64(4), object(5)
memory usage: 4.0+ KB


Unnamed: 0,CustomerID,TotalSales,Country
0,3.0,570.28,Liechtenstein
1,4.0,3887.24,Vanuatu
2,6.0,1579.6,Gabon
3,8.0,1892.64,Bangladesh
4,9.0,253.32,United States Minor Outlying Islands


### Loading

#### Connect to SQLite

In [212]:
import sqlite3

conn = sqlite3.connect("retail_dw.db")
cursor = conn.cursor()


#### Create Dimension and Fact Table

In [213]:
cursor.execute("""
CREATE TABLE IF NOT EXISTS CustomerDim (
    CustomerPK INTEGER PRIMARY KEY,
    CustomerCode TEXT,
    Country TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS TimeDim (
    TimePK INTEGER PRIMARY KEY,
    InvoiceDate TEXT,
    Year INTEGER,
    Month INTEGER,
    Day INTEGER
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS ProductDim (
    ProductPK INTEGER PRIMARY KEY,
    ProductID TEXT,
    ProductName TEXT,
    Category TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS SalesFact (
    InvoiceNo TEXT PRIMARY KEY,
    TimePK INTEGER,
    ProductPK INTEGER,
    CustomerPK INTEGER,
    Quantity INTEGER,
    UnitPrice REAL,
    TotalSales REAL,
    FOREIGN KEY(CustomerPK) REFERENCES CustomerDim(CustomerPK),
    FOREIGN KEY(ProductPK) REFERENCES ProductDim(ProductPK),
    FOREIGN KEY(TimePK) REFERENCES TimeDim(TimePK)
)
""")
conn.commit()
print("Tables created successfully!")

Tables created successfully!


#### Prepare Dimension Data

In [214]:
df_customer = customer_summary[['CustomerID', 'Country']].rename(columns={'CustomerID': 'CustomerCode'})
df_customer['CustomerPK'] = df_customer.index + 1  # Primary key
print("\nCustomerDim Preview:")
print(df_customer.head())
df_customer.to_sql('CustomerDim', conn, if_exists='replace', index=False)
print(f"CustomerDim loaded: {len(df_customer)} rows")

# Time Dimension
df_time = df_last_year[['InvoiceDate']].drop_duplicates().reset_index(drop=True)
df_time['TimePK'] = df_time.index + 1
df_time['Year'] = df_time['InvoiceDate'].dt.year
df_time['Month'] = df_time['InvoiceDate'].dt.month
df_time['Day'] = df_time['InvoiceDate'].dt.day
print("\nTimeDim Preview:")
print(df_time.head())
df_time.to_sql('TimeDim', conn, if_exists='replace', index=False)
print(f"TimeDim loaded: {len(df_time)} rows")

# Product Dimension
df_product = df_last_year[['ProductID', 'ProductName', 'Category']].drop_duplicates().reset_index(drop=True)
df_product['ProductPK'] = df_product.index + 1
print("\nProductDim Preview:")
print(df_product.head())
df_product.to_sql('ProductDim', conn, if_exists='replace', index=False)
print(f"ProductDim loaded: {len(df_product)} rows")





CustomerDim Preview:
   CustomerCode                               Country  CustomerPK
0           3.0                         Liechtenstein           1
1           4.0                               Vanuatu           2
2           6.0                                 Gabon           3
3           8.0                            Bangladesh           4
4           9.0  United States Minor Outlying Islands           5
CustomerDim loaded: 64 rows

TimeDim Preview:
          InvoiceDate  TimePK  Year  Month  Day
0 2025-02-28 02:11:07       1  2025      2   28
1 2025-06-01 20:26:35       2  2025      6    1
2 2025-05-16 07:33:03       3  2025      5   16
3 2025-01-28 20:00:02       4  2025      1   28
4 2024-10-19 11:32:08       5  2024     10   19
TimeDim loaded: 47 rows

ProductDim Preview:
  ProductID        ProductName     Category  ProductPK
0      T402             RC Car         Toys          1
1      T401           Lego Set         Toys          2
2      E101  Bluetooth Speaker  Electr

#### Prepare and Load Fact Table

In [215]:
# Prepare and Load Fact Table
# Merge dimension keys into fact table
df_fact = df_last_year.merge(df_customer, left_on='CustomerID', right_on='CustomerCode')
df_fact = df_fact.merge(df_time, on='InvoiceDate')
df_fact = df_fact.merge(df_product, on=['ProductID', 'ProductName', 'Category'])
# Calculate TotalSales
df_fact['TotalSales'] = df_fact['Quantity'] * df_fact['UnitPrice']
# Changed: Cast Quantity to int for SalesFact schema
df_fact['Quantity'] = df_fact['Quantity'].astype(int)
# Select and rename columns for SalesFact
df_fact_final = df_fact[['InvoiceNo', 'TimePK', 'ProductPK', 'CustomerPK', 'Quantity', 'UnitPrice', 'TotalSales']]
print("\nSalesFact Preview:")
print(df_fact_final.head())
df_fact_final.to_sql('SalesFact', conn, if_exists='replace', index=False)
# Added logging
print(f"SalesFact loaded: {len(df_fact_final)} rows")

# Commit and close connection
conn.commit()
conn.close()
print("\nETL process completed successfully")




SalesFact Preview:
                              InvoiceNo  TimePK  ProductPK  CustomerPK  \
0  4173d89e-4c62-47c3-aff5-f6377f307a54       1          1          55   
1  c9d444c2-72e6-41f1-8be0-c0b6e60bdfe4       2          2          61   
2  e78db86a-5dff-494e-8a2e-a5bd3b00cad4       3          3          55   
3  29d25e11-50d4-4b2e-b3a1-dcce0190ec2c       4          4          33   
4  c0ba7fbf-fecf-4f29-9b40-4c7ab73c8c20       5          5          58   

   Quantity  UnitPrice  TotalSales  
0         9      70.85      637.65  
1         8     155.31     1242.48  
2         3      25.21       75.63  
3         5     251.21     1256.05  
4         5     118.00      590.00  
SalesFact loaded: 47 rows

ETL process completed successfully
