Importing Libraries

In [1]:
import pandas as pd
import time
from functools import wraps
from sqlalchemy import create_engine, text
import urllib
import json
import re

Logging decorator

In [2]:
def log_step(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print(f"Starting: {func.__name__}")
        start = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start
        print(f"Completed: {func.__name__} in {duration:.2f} seconds\n")
        return result
    return wrapper

Loading the params file

In [3]:
@log_step
def load_json_config(json_path: str) -> dict:
    with open(json_path, 'r') as f:
        raw = json.load(f)

    config = raw[0] if isinstance(raw, list) else raw
    return config

Step 1: Extract raw data

In [4]:
@log_step
def extract_data(filepath: str) -> pd.DataFrame:
    df = pd.read_csv(filepath, encoding='ISO-8859-1')
    df.columns = [col.strip().replace(" ", "_") for col in df.columns]

    # Trim all string columns
    for col in df.select_dtypes(include='object').columns:
        df[col] = df[col].astype(str).str.strip()

    return df

Step 2: helping function to extract features for the DimTables

In [5]:
@log_step
def assign_seasonality_from_description(df: pd.DataFrame) -> pd.DataFrame:
    seasonal_keywords = [
        "christmas", "xmas", "wreath", "snow", "advent", "stocking", "santa",
        "holiday", "reindeer", "easter", "halloween", "valentine", "spring",
        "summer", "autumn", "winter"
    ]

    def is_seasonal(desc):
        if not isinstance(desc, str):
            return "N"
        desc = desc.lower()
        return "Y" if any(keyword in desc for keyword in seasonal_keywords) else "N"

    df["seasonal"] = df["Description"].apply(is_seasonal)
    return df


In [6]:
@log_step
def assign_product_category(df: pd.DataFrame, category_map: dict) -> pd.DataFrame:
    category_keywords = {
        cat: [kw.lower() for kw in keywords]
        for cat, keywords in category_map.items()
    }

    def score_category(description):
        if not isinstance(description, str):
            return "Unknown"
        description_lower = description.lower()

        scores = {}
        for category, keywords in category_keywords.items():
            count = sum(1 for kw in keywords if re.search(rf"\b{re.escape(kw)}\b", description_lower))
            if count > 0:
                scores[category] = count

        if scores:
            return max(scores, key=scores.get)
        else:
            return "Unknown"

    df['product_category'] = df['Description'].apply(score_category)
    return df

In [7]:
def get_time_of_day(hour):
        if 5 <= hour < 12:
            return "morning"
        elif 12 <= hour < 17:
            return "afternoon"
        elif 17 <= hour < 22:
            return "evening"
        else:
            return "night"

Step 3: ETL before uploading the data into MSSQL server.

In [8]:
@log_step
def transform_fact_sales(df: pd.DataFrame, config: dict) -> pd.DataFrame:
    df = df.copy()

    # === Mappings ===
    continent_map = config["continent_map"]
    category_map = config["product_category"]

    country_to_continent = {
        country: continent
        for continent, countries in continent_map.items()
        for country in countries
    }

    # === Drop records with null on Customer ID and Country ===#
    df = df.dropna(subset=['Country', 'Customer_ID'])
    
    # === Fields for DimCustomer ===#
    df['Customer_ID'] = df['Customer_ID'].fillna('Unknown')
    df["customer_id"] = pd.to_numeric(df["Customer_ID"], errors="coerce").astype("Int64")
    df['country'] = df['Country'].fillna('Unknown')
    df['continent'] = df['Country'].map(country_to_continent).fillna('Unknown')
    # ==========================#

    # === Fields for DimDate ===#
    df["InvoiceDate"] = pd.to_datetime(df["InvoiceDate"], errors="coerce") # -> This is useful to extract the targetted column values
    df["date_id"] = df["InvoiceDate"].dt.strftime("%Y%m%d").astype(int)
    df["year"] = df["InvoiceDate"].dt.year
    df["month"] = df["InvoiceDate"].dt.month
    df["day"] = df["InvoiceDate"].dt.day
    df["weekday"] = df["InvoiceDate"].dt.weekday + 1   # Monday = 1, Sunday = 7
    df["quarter"] = df["InvoiceDate"].dt.quarter
    # ==========================#

    # === Fields for DimDate ===#
    df["time_id"] = df["InvoiceDate"].dt.strftime("%H%M%S").astype(int) 
    df["hours"] = df["InvoiceDate"].dt.hour
    df["minutes"] = df["InvoiceDate"].dt.minute
    df["seconds"] = df["InvoiceDate"].dt.second
    df["time_of_day"] = df["hours"].apply(get_time_of_day)
    # ==========================#

    # === Fields for FactSales ===#
    df["price"] = df["Price"].astype(float)
    df["quantity"] = df["Quantity"].astype(int)
    df["total_amount"] = df["quantity"] * df["price"]

    # === Fields for DimInvoice ===#
    df["invoice_id"] = df["Invoice"].astype(str)    
    df["returned"] = df.apply(
        lambda row: "Y" if row["Quantity"] < 0 and str(row["invoice_id"]).startswith("C") else "N", axis=1)
    df["discount"] = df.apply(
        lambda row: "Y" if row["Quantity"] < 0 and row["StockCode"] == "D" else "N", axis=1)
    # =============================#
    
    df = df[df["Price"] > 0]    

    # === Fields for DimProduct ===#
    df["product_id"] = df["StockCode"].astype(str)    
    df["description"] = df["Description"].astype(str)
    df = assign_product_category(df, category_map)
    df = assign_seasonality_from_description(df)
    # =============================#

    df = df.drop(columns=["Invoice", "Customer_ID", "StockCode", "Quantity", "Country", "Price", "Description"])#, "InvoiceDate"])

    # Final filter
    df_clean = df[
        df["InvoiceDate"].notnull() &
        df["price"].notnull() &
        (df["quantity"] != 0)
    ].copy()

    return df_clean

Step 4: Set Up SQL Server Connection & Load to SQL Server

In [9]:
@log_step
def create_sql_engine_from_json(config: dict):

    conn_params = config["conn_params"]
    username = conn_params["username"]
    password = conn_params["password"]
    server = conn_params["hostname"]
    database = conn_params["database"]

    # Create connection string
    params = urllib.parse.quote_plus(
        f"DRIVER={{ODBC Driver 17 for SQL Server}};"
        f"SERVER={server};"
        f"DATABASE={database};"
        f"UID={username};"
        f"PWD={password}"
    )

    engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}")
    return engine

@log_step
def load_table(df: pd.DataFrame, engine, table_name: str, columns: list[str] = None, if_exists: str = "append"):
    upload_df = df.copy()

    if columns:
        upload_df = upload_df[columns]

    upload_df.to_sql(
        name=table_name,
        con=engine,
        if_exists=if_exists,
        index=False,
        chunksize=1000
    )

@log_step
def truncate_tables(engine, tables: list[str], verbose: bool = True):
    """
    Truncate multiple tables in SQL Server.
    """
    with engine.begin() as conn:
        for table in tables:
            if verbose:
                print(f"Deleting: {table}")
            conn.execute(text(f"DELETE FROM {table}"))
    print("All tables truncated successfully.")

In [10]:
if __name__ == "__main__":

    # LOAD CONFIG AND INITIALIZE ENGINE
    # ---------------------------------------
    config_file = load_json_config("./params.json")
    engine = create_sql_engine_from_json(config=config_file)
    
    # EXTRACT RAW DATA
    # ---------------------------------------
    df_raw = extract_data("./Invoices_Year_2009-2010.csv")
    
    # TRANSFORM FACT SALES
    # ---------------------------------------
    df_fact_sales = transform_fact_sales(df=df_raw, config=config_file)

    # TRUNCATE TABLES BEFORE LOAD PROCESS
    # ---------------------------------------
    tables_to_truncate = [
        "FactSales_Staging",
        "FactSales",
        "DimCustomer",
        "DimProduct",
        "DimTime",
        "DimDate",
        "DimInvoice"
    ]

    truncate_tables(engine, tables_to_truncate)
        
    
    # BUILD AND LOAD DIMENSION TABLES
    # ---------------------------------------    
    # Customer Dimension
    df_dim_customer = df_fact_sales[['customer_id', 'country', 'continent']].drop_duplicates(subset=['customer_id'])
    load_table(df_dim_customer, engine, "DimCustomer", columns=['customer_id', 'country', 'continent'])
    
    # Product Dimension
    df_dim_product = df_fact_sales[['product_id', 'description', 'product_category', 'seasonal']].drop_duplicates(subset=['product_id'])
    load_table(df_dim_product, engine, "DimProduct", columns=['product_id', 'description', 'product_category', 'seasonal'])
    
    # Time Dimension
    df_dim_time = df_fact_sales[['time_id', 'hours', 'minutes', 'seconds', 'time_of_day']].drop_duplicates(subset=['time_id'])
    load_table(df_dim_time, engine, "DimTime", columns=['time_id', 'hours', 'minutes', 'seconds', 'time_of_day'])
    
    # Date Dimension
    df_dim_date = df_fact_sales[['date_id', 'year', 'month', 'day', 'weekday', 'quarter']].drop_duplicates(subset=['date_id'])
    load_table(df_dim_date, engine, "DimDate", columns=['date_id', 'year', 'month', 'day', 'weekday', 'quarter'])
    
    # Invoice Dimension
    df_dim_invoice = df_fact_sales[['invoice_id', 'returned', 'discount']].drop_duplicates(subset=['invoice_id'])
    load_table(df_dim_invoice, engine, "DimInvoice", columns=['invoice_id', 'returned', 'discount'])
    
    # STAGING FACT TABLE (NATURAL KEYS)
    # ---------------------------------------
    df_staging = df_fact_sales[[
        'invoice_id', 'product_id', 'customer_id',
        'date_id', 'time_id', 'quantity', 'price', 'total_amount'
    ]].drop_duplicates(subset=['invoice_id', 'product_id', 'date_id', 'time_id'])
    
    load_table(
        df_staging,
        engine,
        table_name="FactSales_Staging",
        columns=[
            'invoice_id', 'product_id', 'customer_id',
            'date_id', 'time_id', 'quantity', 'price', 'total_amount'
        ]
    )
    
    # LOAD FACT TABLE WITH FOREIGN KEYS
    # ---------------------------------------
    insert_fact_sales_query = text("""
    INSERT INTO FactSales (
        invoice_fk, 
        product_fk, 
        customer_fk, 
        date_fk, 
        time_fk, 
        quantity, 
        price, 
        total_amount
    )
    SELECT 
        i.invoice_pk, 
        p.product_pk, 
        c.customer_pk, 
        d.date_pk, 
        t.time_pk,
        s.quantity, 
        s.price, 
        s.total_amount
    FROM FactSales_Staging s
    JOIN DimInvoice i  ON s.invoice_id = i.invoice_id
    JOIN DimProduct p  ON s.product_id = p.product_id
    JOIN DimCustomer c ON s.customer_id = c.customer_id
    JOIN DimDate d     ON s.date_id = d.date_id
    JOIN DimTime t     ON s.time_id = t.time_id;
    """)
    
    with engine.begin() as conn:
        conn.execute(insert_fact_sales_query)

Starting: load_json_config
Completed: load_json_config in 0.00 seconds

Starting: create_sql_engine_from_json
Completed: create_sql_engine_from_json in 0.07 seconds

Starting: extract_data


  df = pd.read_csv(filepath, encoding='ISO-8859-1')


Completed: extract_data in 1.48 seconds

Starting: transform_fact_sales
Starting: assign_product_category
Completed: assign_product_category in 150.70 seconds

Starting: assign_seasonality_from_description
Completed: assign_seasonality_from_description in 1.38 seconds

Completed: transform_fact_sales in 169.91 seconds

Starting: truncate_tables
Deleting: FactSales_Staging
Deleting: FactSales
Deleting: DimCustomer
Deleting: DimProduct
Deleting: DimTime
Deleting: DimDate
Deleting: DimInvoice
All tables truncated successfully.
Completed: truncate_tables in 0.28 seconds

Starting: load_table
Completed: load_table in 0.74 seconds

Starting: load_table
Completed: load_table in 0.63 seconds

Starting: load_table
Completed: load_table in 0.26 seconds

Starting: load_table
Completed: load_table in 0.11 seconds

Starting: load_table
Completed: load_table in 0.93 seconds

Starting: load_table
Completed: load_table in 231.13 seconds

