## Step 1 — Extract

## What we do:
* Read the dataset (Online_Retail.csv) from disk into a pandas DataFrame.
* Remove rows missing essential values:
* InvoiceNo → needed to identify transactions.
* StockCode → product identification.
* Quantity and UnitPrice → required for sales calculations.
* InvoiceDate → needed for time-based analysis.
* Convert InvoiceDate to a proper datetime type so we can filter and group by time later.
* Remove any rows where the date could not be parsed.

## Why we do it:
* Ensures we are working only with valid, complete data before transformations.
* Makes sure the InvoiceDate column is in a format that allows filtering and aggregations.
* Avoids issues in later steps from missing or invalid values.

In [10]:
import pandas as pd
import sqlite3

def extract(file_path):
    df = pd.read_csv(file_path, encoding="ISO-8859-1")
    df = df.dropna(subset=["InvoiceNo", "StockCode", "Quantity", "InvoiceDate", "UnitPrice", "CustomerID"])
    df["InvoiceDate"] = pd.to_datetime(df["InvoiceDate"], errors="coerce")
    df = df.dropna(subset=["InvoiceDate"])
    return df

## Step 2 — Transform

## What we do:
* Remove invalid transactions:
* Negative or zero Quantity values.
* Zero or negative UnitPrice.
* Create a new column:
* TotalSales = Quantity * UnitPrice → This is the key sales measure.
* Filter transactions to the last year relative to 2025-08-12 (exam requirement).
* Create dimension-like tables:
* CustomerDim: unique CustomerID and Country.
* TimeDim: unique dates with TimeID, Month, Quarter, Year for time-based OLAP.
* Prepare fact table:
* SalesFact: contains CustomerID, TimeID, Quantity, and TotalSales.

## Why we do it:
* Removes bad data so our metrics are accurate.
* Adds new calculated metrics for reporting.
* Structures the data into star schema format to make OLAP queries easier in Task 3.
* Filters for recent transactions to keep analysis relevant and within the scope.

In [16]:
def transform(df):
    df["TotalSales"] = df["Quantity"] * df["UnitPrice"]

    # Ensure unique customers
    customer_dim = df[["CustomerID", "Country"]].drop_duplicates(subset=["CustomerID"]).reset_index(drop=True)

    # Time dimension
    time_df = df[["InvoiceDate"]].drop_duplicates().reset_index(drop=True)
    time_df["TimeID"] = time_df.index + 1
    time_df["Date"] = time_df["InvoiceDate"].dt.date
    time_df["Month"] = time_df["InvoiceDate"].dt.month
    time_df["Quarter"] = time_df["InvoiceDate"].dt.quarter
    time_df["Year"] = time_df["InvoiceDate"].dt.year
    time_dim = time_df[["TimeID", "Date", "Month", "Quarter", "Year"]]

    # Map TimeID to fact table
    df = df.merge(time_df[["InvoiceDate", "TimeID"]], on="InvoiceDate", how="left")

    # Fact table
    sales_fact = df[["CustomerID", "TimeID", "Quantity", "TotalSales"]]

    return customer_dim, time_dim, sales_fact


## Step 3 — Load

## What we do:
* Connect to a SQLite database (retail_dw.db).
* Create tables:
* CustomerDim
* TimeDim
* SalesFact
* Load the cleaned/transformed data into these tables.
* Enforce foreign key constraints to maintain referential integrity.

## Why we do it:
* Moves data into a data warehouse structure for analysis.
* Allows running SQL queries efficiently in later steps (Task 3).
* Ensures we follow proper relational database design.

In [17]:
def load(customer_dim, time_dim, sales_fact, db_name="retail_dw.db"):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    cursor.executescript("""
    DROP TABLE IF EXISTS SalesFact;
    DROP TABLE IF EXISTS TimeDim;
    DROP TABLE IF EXISTS CustomerDim;

    CREATE TABLE CustomerDim (
        CustomerID INTEGER PRIMARY KEY,
        Country TEXT
    );
    CREATE TABLE TimeDim (
        TimeID INTEGER PRIMARY KEY,
        Date TEXT,
        Month INTEGER,
        Quarter INTEGER,
        Year INTEGER
    );
    CREATE TABLE SalesFact (
        SalesID INTEGER PRIMARY KEY AUTOINCREMENT,
        CustomerID INTEGER,
        TimeID INTEGER,
        Quantity INTEGER,
        TotalSales REAL,
        FOREIGN KEY (CustomerID) REFERENCES CustomerDim(CustomerID),
        FOREIGN KEY (TimeID) REFERENCES TimeDim(TimeID)
    );
    """)

    customer_dim.to_sql("CustomerDim", conn, if_exists="append", index=False)
    time_dim.to_sql("TimeDim", conn, if_exists="append", index=False)
    sales_fact.to_sql("SalesFact", conn, if_exists="append", index=False)

    conn.commit()
    conn.close()

In [18]:
# ===== Run ETL =====
file_path = r"C:\Users\Salma\New folder\OneDrive\Desktop\DSA 2040_Practical_Exam\DSA-2040_Practical_Exam_Halima_315\Online_Retail.csv"
df_extracted = extract(file_path)
customer_dim, time_dim, sales_fact = transform(df_extracted)
load(customer_dim, time_dim, sales_fact)