# ETL Pipeline for Online Retail Dataset

## Overview 

This notebook demonstrates an ETL (Extract, Transform, Load) process using the Online Retail dataset.  
We use **pandas** for data handling and **SQLite** as the database.  

The steps are:
1. Extract data from CSV and sample 1000 rows.  
2. Transform data (clean, add TotalSales, filter last year, create dimensions).  
3. Load data into SQLite database with one fact table and two dimension tables.  
4. Verify contents of the database.  


In [9]:
import pandas as pd
import numpy as np
import sqlite3
from sqlalchemy import create_engine
import logging
from datetime import datetime, timedelta

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)    

## Extract Step

We extract data from the CSV file.  
Since the original dataset has over 500,000 rows, we sample **1000 rows** to make it manageable.  
We also ensure correct encoding (`ISO-8859-1`) to handle special characters.


In [10]:
def extract_data(file_path, sample_size=1000):
    """Extract data from CSV and downsample to manageable size"""
    logger.info("Starting data extraction...")
    try:
        df = pd.read_csv(file_path, encoding='ISO-8859-1', low_memory=False)
        logger.info(f"Extracted {len(df)} rows from {file_path}")
        
        # Downsample to 1000 rows
        if len(df) > sample_size:
            df = df.sample(sample_size, random_state=42)
            logger.info(f"Downsampled to {len(df)} rows")
        
        return df
    except Exception as e:
        logger.error(f"Error extracting data: {e}")
        raise

# Run extract
df_raw = extract_data("../../data/online_retail.csv")
df_raw.head()


INFO:__main__:Starting data extraction...
INFO:__main__:Extracted 541909 rows from ../../data/online_retail.csv
INFO:__main__:Downsampled to 1000 rows


Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
209268,555200,71459,HANGING JAM JAR T-LIGHT HOLDER,24,01/06/2011 12:05,0.85,17315.0,United Kingdom
207108,554974,21128,GOLD FISHING GNOME,4,27/05/2011 17:14,6.95,14031.0,United Kingdom
167085,550972,21086,SET/6 RED SPOTTY PAPER CUPS,4,21/04/2011 17:05,0.65,14031.0,United Kingdom
471836,576652,22812,PACK 3 BOXES CHRISTMAS PANETTONE,3,16/11/2011 10:39,1.95,17198.0,United Kingdom
115865,546157,22180,RETROSPOT LAMP,2,10/03/2011 08:40,9.95,13502.0,United Kingdom


## Transform Step

We now clean and transform the dataset:  
- Drop rows with missing CustomerID.  
- Convert InvoiceDate to datetime.  
- Remove outliers (Quantity < 0, UnitPrice <= 0).  
- Create a new column `TotalSales = Quantity * UnitPrice`.  
- Filter transactions for the **last year** (Aug 12, 2024 to Aug 12, 2025).  
- Create two dimension tables:
  - CustomerDim (CustomerID, TotalPurchases, Country).  
  - TimeDim (unique dates with Year, Month, Day).  


In [11]:
def transform_data(df):
    """Clean, transform, and prepare data for loading"""
    logger.info("Starting data transformation...")

    # Handle missing values (drop rows missing CustomerID)
    df = df.dropna(subset=["CustomerID"])

    # Convert InvoiceDate to datetime
    df["InvoiceDate"] = pd.to_datetime(df["InvoiceDate"], errors="coerce")

    # Remove outliers: Quantity < 0 or UnitPrice <= 0
    df = df[(df["Quantity"] >= 0) & (df["UnitPrice"] > 0)]

    # Add TotalSales column
    df["TotalSales"] = df["Quantity"] * df["UnitPrice"]

    # Filter for last year (Aug 12, 2024 to Aug 12, 2025)
    # end_date = datetime(2025, 8, 12)
    start_date = datetime(2010, 12, 1)
    end_date = datetime(2011, 12, 9)
    df = df[(df["InvoiceDate"] >= start_date) & (df["InvoiceDate"] <= end_date)]

    # Customer summary
    customer_summary = df.groupby("CustomerID").agg({
        "TotalSales": "sum",
        "Country": "first"
    }).reset_index().rename(columns={"TotalSales": "TotalPurchases"})

    # Time dimension
    time_dim = pd.DataFrame({
        "InvoiceDate": df["InvoiceDate"].drop_duplicates()
    })
    time_dim["Year"] = time_dim["InvoiceDate"].dt.year
    time_dim["Month"] = time_dim["InvoiceDate"].dt.month
    time_dim["Day"] = time_dim["InvoiceDate"].dt.day

    logger.info(f"Transformed data has {len(df)} fact rows, "
                f"{len(customer_summary)} customers, "
                f"{len(time_dim)} time records")

    return df, customer_summary, time_dim

# Run transform
df_sales, df_customers, df_time = transform_data(df_raw)


INFO:__main__:Starting data transformation...
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["InvoiceDate"] = pd.to_datetime(df["InvoiceDate"], errors="coerce")
INFO:__main__:Transformed data has 288 fact rows, 233 customers, 274 time records


## Load Step

We now load the transformed data into an SQLite database (`retail_dw.db`).  
We create:  
- SalesFact table (transactions with TotalSales).  
- CustomerDim table (customers and total purchases).  
- TimeDim table (date breakdown).  


In [12]:
def load_data(sales, customers, time, db_name="retail_dw.db"):
    """Load transformed data into SQLite database"""
    logger.info("Starting data load...")
    engine = create_engine(f"sqlite:///{db_name}")

    try:
        sales.to_sql("SalesFact", engine, if_exists="replace", index=False)
        customers.to_sql("CustomerDim", engine, if_exists="replace", index=False)
        time.to_sql("TimeDim", engine, if_exists="replace", index=False)
        logger.info("Data loaded successfully into SQLite database.")
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        raise

# Run load
load_data(df_sales, df_customers, df_time)


INFO:__main__:Starting data load...
INFO:__main__:Data loaded successfully into SQLite database.


## Full ETL Pipeline

We wrap all steps into a single ETL function for automation.  
This allows the entire pipeline to run with one function call.  


In [13]:
def run_etl(file_path):
    logger.info("Running full ETL pipeline...")
    df = extract_data(file_path)
    sales, customers, time = transform_data(df)
    load_data(sales, customers, time)
    logger.info("ETL pipeline complete!")

# Run full pipeline
run_etl("../../data/online_retail.csv")


INFO:__main__:Running full ETL pipeline...
INFO:__main__:Starting data extraction...
INFO:__main__:Extracted 541909 rows from ../../data/online_retail.csv
INFO:__main__:Downsampled to 1000 rows
INFO:__main__:Starting data transformation...
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["InvoiceDate"] = pd.to_datetime(df["InvoiceDate"], errors="coerce")
INFO:__main__:Transformed data has 288 fact rows, 233 customers, 274 time records
INFO:__main__:Starting data load...
INFO:__main__:Data loaded successfully into SQLite database.
INFO:__main__:ETL pipeline complete!


## Verify Database Contents

Finally, we check that the tables were created and populated correctly in `retail_dw.db`.


In [14]:
with sqlite3.connect("retail_dw.db") as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    print("Tables in database:", cursor.fetchall())

    # Example: check first few rows of SalesFact
    df_check = pd.read_sql("SELECT * FROM SalesFact LIMIT 5;", conn)
    display(df_check)


Tables in database: [('SalesFact',), ('CustomerDim',), ('TimeDim',)]


Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,TotalSales
0,555200,71459,HANGING JAM JAR T-LIGHT HOLDER,24,2011-01-06 12:05:00.000000,0.85,17315.0,United Kingdom,20.4
1,546157,22180,RETROSPOT LAMP,2,2011-10-03 08:40:00.000000,9.95,13502.0,United Kingdom,19.9
2,545721,82484,WOOD BLACK BOARD ANT WHITE FINISH,1,2011-07-03 10:52:00.000000,7.9,15039.0,United Kingdom,7.9
3,551954,22799,SWEETHEART WIRE FRUIT BOWL,1,2011-05-05 12:13:00.000000,8.5,14515.0,United Kingdom,8.5
4,546237,84508A,CAMOUFLAGE DESIGN TEDDY,6,2011-10-03 12:50:00.000000,2.55,16625.0,United Kingdom,15.3


In [15]:
import sqlite3
import pandas as pd

conn = sqlite3.connect("retail_dw.db")
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
print("Tables in database:", cursor.fetchall())
conn.close()

Tables in database: [('SalesFact',), ('CustomerDim',), ('TimeDim',)]
