## ETL Process

In [3]:
# Import Libraries
import pandas as pd
import sqlite3
from datetime import datetime


### Extraction

In [2]:
# Define file path again
data_path = r"C:\Users\Admin\OneDrive - United States International University (USIU)\Documents\USIU_A\US2025\DSA2040A\Final Exam\DSA_2040_Practical_Exam_Ambachow_550\Data_Warehousing\data\online_retail.csv"

# Step 1: Extract - Read CSV
df = pd.read_csv(data_path)

#### Columns/Variables

In [7]:
print(df.columns)
print(df.info())

Index(['Description', 'Quantity', 'InvoiceDate', 'UnitPrice', 'CustomerID',
       'Country'],
      dtype='object')
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 6 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   Description  540455 non-null  object 
 1   Quantity     541909 non-null  int64  
 2   InvoiceDate  541909 non-null  object 
 3   UnitPrice    541909 non-null  float64
 4   CustomerID   406829 non-null  float64
 5   Country      541909 non-null  object 
dtypes: float64(2), int64(1), object(3)
memory usage: 24.8+ MB
None


### Wrangle function for Transformation process

In [4]:

def etl_online_retail(
    csv_path: str,
    db_path: str,
    filter_date: str = '2024-08-12'  # Default cutoff date for filtering sales last year
):
    """
    ETL process for Online Retail data.
    
    Args:
        csv_path (str): File path to the input CSV data.
        db_path (str): SQLite database file path to load data.
        filter_date (str): Filter sales from this date onward (YYYY-MM-DD).
        
    Returns:
        None, but saves cleaned/transformed data into SQLite DB.
    """
    print("Starting ETL process...")
    
    # Transform
    print("Transforming data...")
    df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
    
    # Remove rows with invalid Quantity or UnitPrice
    original_count = df.shape[0]
    df = df[(df['Quantity'] > 0) & (df['UnitPrice'] > 0)]
    filtered_count = df.shape[0]
    print(f"Removed {original_count - filtered_count} rows with invalid Quantity or UnitPrice")
    
    # Calculate TotalSales
    df['TotalSales'] = df['Quantity'] * df['UnitPrice']
    
    # Filter for sales from filter_date onward
    cutoff_date = pd.Timestamp(filter_date)
    df_filtered = df[df['InvoiceDate'] >= cutoff_date]
    print(f"Filtered data to {df_filtered.shape[0]} rows from {filter_date} onwards")
    
    # Customer Dimension
    print("Creating Customer Dimension...")
    customer_dim = df_filtered.groupby('CustomerID').agg(
        TotalPurchases=('TotalSales', 'sum'),
        Country=('Country', 'first')
    ).reset_index()
    
    # Time Dimension
    print("Creating Time Dimension...")
    time_dim = df_filtered[['InvoiceDate']].drop_duplicates().copy()
    time_dim['Date'] = time_dim['InvoiceDate'].dt.date
    time_dim['Year'] = time_dim['InvoiceDate'].dt.year
    time_dim['Quarter'] = time_dim['InvoiceDate'].dt.quarter
    time_dim['Month'] = time_dim['InvoiceDate'].dt.month
    
    # Sales Fact Table
    print("Preparing Sales Fact Table...")
    sales_fact = df_filtered.merge(time_dim, on='InvoiceDate', how='left')
    sales_fact_table = sales_fact[[
        'InvoiceNo', 'CustomerID', 'StockCode', 'Quantity', 'UnitPrice', 'TotalSales',
        'Date', 'Year', 'Quarter', 'Month'
    ]]
    
    

In [None]:
import pandas as pd
import sqlite3
from datetime import datetime

def etl_online_retail(csv_path, db_path, filter_date='2024-08-12'):
    """
    ETL process for Online Retail data into a star schema SQLite database.

    Args:
        csv_path (str): Path to the raw CSV dataset.
        db_path (str): Path to SQLite database file.
        filter_date (str): Filter sales from this date onward (YYYY-MM-DD).

    Returns:
        None (creates and loads tables into SQLite DB)
    """
    print("Starting ETL process...")

    # Extract
    print("Loading raw data...")
    df = pd.read_csv(csv_path, encoding='ISO-8859-1')

    # Basic cleaning and type conversions
    df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
    df = df.dropna(subset=['InvoiceDate', 'CustomerID', 'StockCode'])  # Remove rows missing critical keys
    
    # Remove invalid sales data
    df = df[(df['Quantity'] > 0) & (df['UnitPrice'] > 0)]

    # Calculate TotalSales
    df['TotalSales'] = df['Quantity'] * df['UnitPrice']

    # Filter sales from filter_date onward
    cutoff_date = pd.to_datetime(filter_date)
    df = df[df['InvoiceDate'] >= cutoff_date]

    # Create TimeDim table
    print("Creating Time Dimension...")
    time_dim = df[['InvoiceDate']].drop_duplicates().copy()
    time_dim['Date'] = time_dim['InvoiceDate'].dt.date
    time_dim['Year'] = time_dim['InvoiceDate'].dt.year
    time_dim['Quarter'] = time_dim['InvoiceDate'].dt.quarter
    time_dim['Month'] = time_dim['InvoiceDate'].dt.month
    time_dim['Day'] = time_dim['InvoiceDate'].dt.day
    time_dim['TimeID'] = time_dim['InvoiceDate'].dt.strftime('%Y%m%d').astype(int)
    time_dim = time_dim.drop(columns=['InvoiceDate'])
    time_dim = time_dim.sort_values('Date')

    # Create CustomerDim table
    print("Creating Customer Dimension...")
    customer_dim = df.groupby('CustomerID').agg({
        'Country': 'first',
        'TotalSales': 'sum'
    }).reset_index()
    customer_dim.columns = ['CustomerID', 'Country', 'TotalPurchases']

    # Create ProductDim table
    print("Creating Product Dimension...")
    product_dim = df.groupby('StockCode').agg({
        'Description': 'first'
    }).reset_index()
    # Optionally add Category, for now set as NULL or 'Unknown'
    product_dim['Category'] = 'Unknown'

    # Create SalesFact table
    print("Creating Sales Fact Table...")
    # Map TimeID in fact table by joining on date
    df['TimeID'] = df['InvoiceDate'].dt.strftime('%Y%m%d').astype(int)
    sales_fact = df[['InvoiceNo', 'CustomerID', 'StockCode', 'Quantity', 'UnitPrice', 'TotalSales', 'TimeID']]

    # Load to SQLite
    print(f"Loading data into SQLite DB at {db_path} ...")
    conn = sqlite3.connect(db_path)

    # Create tables in DB
    with conn:
        # Drop tables if exist (for repeatability)
        conn.execute("DROP TABLE IF EXISTS SalesFact")
        conn.execute("DROP TABLE IF EXISTS CustomerDim")
        conn.execute("DROP TABLE IF EXISTS ProductDim")
        conn.execute("DROP TABLE IF EXISTS TimeDim")

        # Create dimension tables
        conn.execute("""
        CREATE TABLE CustomerDim (
            CustomerID INTEGER PRIMARY KEY,
            Country TEXT,
            TotalPurchases REAL
        )
        """)
        conn.execute("""
        CREATE TABLE ProductDim (
            StockCode TEXT PRIMARY KEY,
            Description TEXT,
            Category TEXT
        )
        """)
        conn.execute("""
        CREATE TABLE TimeDim (
            TimeID INTEGER PRIMARY KEY,
            Date DATE,
            Year INTEGER,
            Quarter INTEGER,
            Month INTEGER,
            Day INTEGER
        )
        """)
        # Create fact table
        conn.execute("""
        CREATE TABLE SalesFact (
            SaleID INTEGER PRIMARY KEY AUTOINCREMENT,
            InvoiceNo TEXT,
            CustomerID INTEGER,
            StockCode TEXT,
            TimeID INTEGER,
            Quantity INTEGER,
            UnitPrice REAL,
            TotalSales REAL,
            FOREIGN KEY(CustomerID) REFERENCES CustomerDim(CustomerID),
            FOREIGN KEY(StockCode) REFERENCES ProductDim(StockCode),
            FOREIGN KEY(TimeID) REFERENCES TimeDim(TimeID)
        )
        """)

    # Insert data into tables
    customer_dim.to_sql('CustomerDim', conn, if_exists='append', index=False)
    product_dim.to_sql('ProductDim', conn, if_exists='append', index=False)
    time_dim.to_sql('TimeDim', conn, if_exists='append', index=False)
    sales_fact.to_sql('SalesFact', conn, if_exists='append', index=False)

    print("ETL process completed successfully!")
    conn.close()

# Usage example:
# etl_online_retail(
#     csv_path="path/to/Online Retail.csv",
#     db_path="path/to/retail_dw.db",
#     filter_date="2024-08-12"
# )


### Load

In [6]:
# Load
print(f"Loading data into database at: {db_path}")
conn = sqlite3.connect(db_path)
    
customer_dim.to_sql('CustomerDim', conn, if_exists='replace', index=False)
time_dim.to_sql('TimeDim', conn, if_exists='replace', index=False)
sales_fact_table.to_sql('SalesFact', conn, if_exists='replace', index=False)
    
conn.close()
print("ETL process completed successfully!")



NameError: name 'db_path' is not defined

### Example usage

In [None]:


csv_path = r"C:\Users\Admin\OneDrive - United States International University (USIU)\Documents\USIU_A\US2025\DSA2040A\Final Exam\DSA_2040_Practical_Exam_Ambachow_550\Data_Warehousing\data\online_retail.csv"
db_path = r"C:\Users\Admin\OneDrive - United States International University (USIU)\Documents\USIU_A\US2025\DSA2040A\Final Exam\DSA_2040_Practical_Exam_Ambachow_550\Data_Warehousing\data\retail_dw.db"

etl_online_retail(csv_path, db_path)