### *Load and preprocess the dataset*

In [2]:

import pandas as pd
import sqlite3
from datetime import datetime
import logging
import os
from glob import glob
import random
import hashlib
from faker import Faker
import os

In [3]:
# Setup logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# *Data Generation: Dimensions*

#### *Generate synthetic Customer Dimension*

In [4]:
random.seed(42)
Faker.seed(42)

*Description:*

*This chunk focuses on loading the raw dataset from a CSV file into a pandas DataFrame, ensuring that special characters in the data are correctly handled by specifying the appropriate encoding. The 'InvoiceDate' column is explicitly converted into a datetime object, which enables more efficient and accurate manipulation of date and time data. To make the dataset appear current for analysis purposes, all invoice dates are shifted forward by 14 years, adjusting the original 2010-2011 timestamps to approximately 2024-2025. This simulated recency of the data can be important for testing or reporting. Finally, the new date range is printed as a sanity check to confirm that the shift was applied correctly.*


In [5]:
# Load dataset CSV into pandas DataFrame.
# Encoding ISO-8859-1 is used to handle special characters.
df = pd.read_csv('../Data/Online_Retail.csv', encoding='ISO-8859-1')

# Convert 'InvoiceDate' column to datetime type for easier date/time operations.
df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])

# Shift invoice dates forward by 14 years to simulate current data (2024-2025).
df['InvoiceDate'] = df['InvoiceDate'] + pd.DateOffset(years=14)

# Verify date range after shifting.
print("Date range after shifting:", df['InvoiceDate'].min(), "to", df['InvoiceDate'].max())

Date range after shifting: 2024-12-01 08:26:00 to 2025-12-09 12:50:00


### *Create Time Dimension Table (TimeDim)*
*Description:*

*In this step, a Time Dimension table is constructed, which is fundamental in data warehousing and analytical processing for providing rich temporal context to sales data. The process starts by extracting all unique dates from the transactional data and normalizing them to remove time components, ensuring that each date appears only once. A unique `TimeID` is generated for each date using the YYYYMMDD format, facilitating efficient joins with fact tables. Additional columns are created to break down each date into components such as day, month, quarter, year, and week number — all valuable for time-based grouping and trend analysis. This denormalized structure simplifies querying and reporting over time.*


In [6]:
# 2. Create the Time Dimension (TimeDim) table
# Create empty DataFrame for Time Dimension
time_dim = pd.DataFrame()

# Extract unique dates from the 'InvoiceDate' column (date only, no time)
time_dim['FullDate'] = pd.to_datetime(df['InvoiceDate'].dt.date.unique())

# Generate a unique TimeID for each date in YYYYMMDD integer format
time_dim['TimeID'] = time_dim['FullDate'].dt.strftime('%Y%m%d').astype(int)

# Extract useful date attributes for analysis
time_dim['Day'] = time_dim['FullDate'].dt.day
time_dim['Month'] = time_dim['FullDate'].dt.month
time_dim['Quarter'] = time_dim['FullDate'].dt.quarter
time_dim['Year'] = time_dim['FullDate'].dt.year
time_dim['WeekOfYear'] = time_dim['FullDate'].dt.isocalendar().week

# Reorder columns for clarity
time_dim = time_dim[['TimeID', 'FullDate', 'Day', 'Month', 'Quarter', 'Year', 'WeekOfYear']]



### *Create Customer Dimension Table (CustomerDim)*

*Description:*

*This chunk builds the Customer Dimension table, which profiles unique customers using a combination of actual and synthetic data. The real `CustomerID` and country information are directly extracted to maintain referential integrity. Since personal identifying information like names and cities are not available or desirable to use, synthetic values are generated to enrich the dataset while preserving privacy. Customer names are created by hashing the `CustomerID` to produce consistent yet anonymous identifiers. Cities are generated using the Faker library with locale settings based on the customer’s country, adding realistic geographic diversity. Additionally, plausible gender and age values are randomly assigned within reasonable bounds to simulate demographic attributes. Finally, the earliest invoice date is used as a proxy for the customer’s registration date, providing a temporal reference for customer activity.*


In [7]:
fake = Faker()

def hash_customer_name(cust_id):
    # Generate a synthetic name by hashing the CustomerID
    return hashlib.sha256(str(cust_id).encode()).hexdigest()[:10]

def generate_city_based_on_country(country):
    # Use Faker locale based on country for city name if possible, else default locale
    # Here we simplify: if country is UK use en_GB, else en_US or default
    if country == 'United Kingdom':
        fake_local = Faker('en_GB')
    else:
        fake_local = Faker()
    return fake_local.city()

# Extract unique customers with country
customer_dim = df[['CustomerID', 'Country']].drop_duplicates().copy()

# Create synthetic CustomerName by hashing CustomerID
customer_dim['CustomerName'] = customer_dim['CustomerID'].apply(hash_customer_name)

# Generate synthetic City based on Country
customer_dim['City'] = customer_dim['Country'].apply(generate_city_based_on_country)

# Generate reasonable synthetic Gender and Age
gender_choices = ['Male', 'Female', 'Other']
customer_dim['Gender'] = [random.choice(gender_choices) for _ in range(len(customer_dim))]
customer_dim['Age'] = [random.randint(18, 75) for _ in range(len(customer_dim))]

# Set CustomerSince as earliest InvoiceDate in the dataset
customer_dim['CustomerSince'] = df['InvoiceDate'].min()

# Drop Email column (not required)
# No Email column added here

customer_dim.head()

Unnamed: 0,CustomerID,Country,CustomerName,City,Gender,Age,CustomerSince
0,17850.0,United Kingdom,54cde5dbb6,North Henrybury,Other,71,2024-12-01 08:26:00
9,13047.0,United Kingdom,86314fa849,East Timothy,Male,69,2024-12-01 08:26:00
26,12583.0,France,dcff63cd99,New Roberttown,Male,74,2024-12-01 08:26:00
46,13748.0,United Kingdom,590354e49f,East Donaldhaven,Other,49,2024-12-01 08:26:00
65,15100.0,United Kingdom,58ec7997a6,New Joeside,Female,52,2024-12-01 08:26:00


### *Create Store Dimension Table (StoreDim)*


*Description:*

*This step constructs the Store Dimension table, which represents the stores or sales locations for transactions. Since the dataset primarily references countries rather than specific store locations, each unique country is treated as a distinct store. Unique numeric `StoreID`s are assigned for efficient foreign key references. Store names are generated to include the country name for clarity and uniqueness. The sales channel is hardcoded as "Online," reflecting the dataset's nature as online retail transactions. To provide richer location information, synthetic cities are generated for each store based on the country, using locale-specific Faker instances to maintain geographic plausibility. This approach allows analysis at the store level while enhancing location details without requiring real-world addresses.*

In [8]:
# Extract unique countries as stores
store_dim = df[['Country']].drop_duplicates().reset_index(drop=True)

# Assign unique StoreID starting from 1
store_dim['StoreID'] = store_dim.index + 1

# Assign StoreName with country suffix for uniqueness
store_dim['StoreName'] = store_dim['Country'].apply(lambda x: f"Online Store - {x}")

# Assign Channel as 'Online' (dataset is online retail)
store_dim['Channel'] = 'Online'

# Generate synthetic City based on country using Faker locales
def generate_city(country):
    if country == 'United Kingdom':
        fake_local = Faker('en_GB')
    else:
        fake_local = Faker()
    return fake_local.city()

store_dim['City'] = store_dim['Country'].apply(generate_city)

store_dim.head()

Unnamed: 0,Country,StoreID,StoreName,Channel,City
0,United Kingdom,1,Online Store - United Kingdom,Online,Kimberleychester
1,France,2,Online Store - France,Online,Higginston
2,Australia,3,Online Store - Australia,Online,Deborahmouth
3,Netherlands,4,Online Store - Netherlands,Online,Berryhaven
4,Germany,5,Online Store - Germany,Online,South Lisachester


In [9]:
fake = Faker()

# 4. Create Product Dimension Table (product_dim)
# ------------------------------------------------
# Extract unique products from the original dataset: StockCode, Description, UnitPrice.
product_dim = df[['StockCode', 'Description', 'UnitPrice']].drop_duplicates().copy()

# Rename columns to fit dimensional model schema
product_dim = product_dim.rename(columns={
    'StockCode': 'ProductID',
    'Description': 'ProductName',
    'UnitPrice': 'UnitCost'
})

# Define a simple function to categorize products based on keywords in ProductName
def categorize_product(name):
    if pd.isna(name):
        return 'Miscellaneous'
    name = name.lower()
    if any(keyword in name for keyword in ['electronic', 'computer', 'usb', 'laptop', 'cable']):
        return 'Electronics'
    elif any(keyword in name for keyword in ['shirt', 'clothing', 'dress', 't-shirt', 'jeans']):
        return 'Clothing'
    elif any(keyword in name for keyword in ['book', 'novel', 'journal']):
        return 'Books'
    elif any(keyword in name for keyword in ['toy', 'game']):
        return 'Toys & Games'
    else:
        return 'Miscellaneous'

# Apply the category function to create a Category column
product_dim['Category'] = product_dim['ProductName'].apply(categorize_product)

# Generate a synthetic Brand name using Faker company names for each product
product_dim['Brand'] = [fake.company() for _ in range(len(product_dim))]

# Display sample of product_dim to verify
product_dim.head()


Unnamed: 0,ProductID,ProductName,UnitCost,Category,Brand
0,85123A,WHITE HANGING HEART T-LIGHT HOLDER,2.55,Miscellaneous,"Carpenter, Burton and Oneal"
1,71053,WHITE METAL LANTERN,3.39,Miscellaneous,Francis-Mann
2,84406B,CREAM CUPID HEARTS COAT HANGER,2.75,Miscellaneous,Lara-Baker
3,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,3.39,Miscellaneous,Diaz-Schaefer
4,84029E,RED WOOLLY HOTTIE WHITE HEART.,3.39,Miscellaneous,Flores LLC


In [10]:
# Convert 'InvoiceDate' to datetime if not already
df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])

# Create a new column 'InvoiceDateOnly' normalized to midnight (date only, no time)
df['InvoiceDateOnly'] = df['InvoiceDate'].dt.normalize()

# Ensure 'FullDate' in time_dim is datetime type for correct merging
time_dim['FullDate'] = pd.to_datetime(time_dim['FullDate'])


### *Prepare FactSales Table (Fact Table)*

*Description:*

*This final chunk assembles the FactSales table, which records individual sales transactions linked to the various dimension tables through foreign keys. First, any encoding issues in column names are corrected to ensure consistency. Duplicate columns created through multiple merges are removed to avoid confusion and errors. The fact table is enriched by merging the Time Dimension to include a `TimeID` foreign key, facilitating time-based joins and analysis. Invoice dates are normalized to exclude time information, aligning with the date-only nature of the Time Dimension. The product identifier is standardized by assigning `ProductID` as the original stock code. The store foreign key (`StoreID`) is merged in based on country, linking sales to store locations. A key metric, `TotalSales`, is calculated by multiplying the quantity sold by the unit price, providing the total revenue per transaction line. Finally, only the relevant columns necessary for the fact table schema are selected to form the `fact_sales` DataFrame, ready for analytical queries or database loading.*


In [11]:
df = df.rename(columns={'ï»¿InvoiceNo': 'InvoiceNo'})
# Use the 'TimeID' and 'StoreID' columns without suffixes if present
if 'TimeID' not in df.columns:
    if 'TimeID_y' in df.columns:
        df['TimeID'] = df['TimeID_y']
    elif 'TimeID_x' in df.columns:
        df['TimeID'] = df['TimeID_x']

if 'StoreID' not in df.columns:
    if 'StoreID_y' in df.columns:
        df['StoreID'] = df['StoreID_y']
    elif 'StoreID_x' in df.columns:
        df['StoreID'] = df['StoreID_x']


In [12]:
df = df.loc[:,~df.columns.duplicated()]

In [13]:
# Merge df with time_dim to get TimeID by matching on normalized date columns
df = df.merge(
    time_dim[['TimeID', 'FullDate']],
    left_on='InvoiceDateOnly',
    right_on='FullDate',
    how='left'
)

# Convert 'InvoiceDate' column to just date (drop time component) for fact table compatibility
df['InvoiceDate'] = df['InvoiceDate'].dt.date

# Assign 'ProductID' as the same value as 'StockCode' for clarity and schema matching
df['ProductID'] = df['StockCode']

# Merge df with store_dim on 'Country' to get StoreID foreign key
df = df.merge(
    store_dim[['StoreID', 'Country']],
    on='Country',
    how='left'
)

# Calculate total sales amount per transaction line
df['TotalSales'] = df['Quantity'] * df['UnitPrice']

#  Add Discount column if missing (assumed 0)
if 'Discount' not in df.columns:
    df['Discount'] = 0

# Then select the columns including Discount
fact_sales = df[['InvoiceNo', 'InvoiceDate', 'TimeID', 'ProductID', 'CustomerID', 'StoreID',
                 'Quantity', 'UnitPrice', 'Discount', 'TotalSales']].copy()

In [14]:
# --- Step 1: Define main folder path ---
folder_path = r"C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1 Task_2_ETL_Process_Implementation"

# --- Step 2: Define Synthetic_data subfolder ---
synthetic_folder = os.path.join(folder_path, 'Synthetic_data')
os.makedirs(synthetic_folder, exist_ok=True)

# --- Step 3: Save CSVs in Synthetic_data folder ---
time_dim.to_csv(os.path.join(synthetic_folder, 'TimeDim.csv'), index=False)
customer_dim.to_csv(os.path.join(synthetic_folder, 'CustomerDim.csv'), index=False)
store_dim.to_csv(os.path.join(synthetic_folder, 'StoreDim.csv'), index=False)
product_dim.to_csv(os.path.join(synthetic_folder, 'ProductDim.csv'), index=False)
fact_sales.to_csv(os.path.join(synthetic_folder, 'FactSales.csv'), index=False)

print(f"All CSV files saved in: {synthetic_folder}")
print("Files in folder now:", os.listdir(synthetic_folder))

All CSV files saved in: C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1 Task_2_ETL_Process_Implementation\Synthetic_data
Files in folder now: ['CustomerDim.csv', 'FactSales.csv', 'ProductDim.csv', 'StoreDim.csv', 'TimeDim.csv']


## *Extract , Transform & Load:Retail_Data*

### *Step 1: Extract*

In [15]:
# Folder where CSVs are stored
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
folder_path = r"C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1 Task_2_ETL_Process_Implementation"
data_folder = os.path.join(folder_path, "Synthetic_data") if os.path.isdir(os.path.join(folder_path, "Synthetic_data")) else folder_path


In [16]:
# List of CSV files
tables = ["FactSales", "CustomerDim", "StoreDim", "ProductDim", "TimeDim"]

# Folder containing the synthetic CSV files
folder_path = r"C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1\Task_2_ETL_Process_Implementation\synthetic_data"
def extract(file_path):
    """Extract data from a CSV file into a DataFrame."""
    logging.info(f"Starting data extraction for {file_path}")
    try:
        df = pd.read_csv(file_path, encoding='ISO-8859-1')
        # Only FactSales has InvoiceDate, convert if present
        if 'InvoiceDate' in df.columns:
            df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
            df = df.dropna(subset=['InvoiceDate'])
        logging.info(f"Extracted {len(df)} rows from {file_path}")
        return df
    except Exception as e:
        logging.error(f"Error during extraction: {e}")
        return pd.DataFrame()  # Return empty DataFrame on failure

# Dictionary to store all extracted DataFrames
dataframes = {}

for table in tables:
    file_path = os.path.join(folder_path, f"{table}.csv")
    df = extract(file_path)
    dataframes[table] = df

# Quick check
for name, df in dataframes.items():
    print(f"{name}: {len(df)} rows")


2025-08-13 12:37:24,531 - INFO - Starting data extraction for C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1\Task_2_ETL_Process_Implementation\synthetic_data\FactSales.csv
2025-08-13 12:37:25,485 - INFO - Extracted 541909 rows from C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1\Task_2_ETL_Process_Implementation\synthetic_data\FactSales.csv
2025-08-13 12:37:25,516 - INFO - Starting data extraction for C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1\Task_2_ETL_Process_Implementation\synthetic_data\CustomerDim.csv
2025-08-13 12:37:25,537 - INFO - Extracted 4389 rows from C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1\Task_2_ETL_Process_Implementation\synthetic_data\CustomerDim.csv
2025-08-13 12:37:25,539 - INFO - Starting data extraction for C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1\Task_2_ETL_Process_Implementation\synthetic_data\S

FactSales: 541909 rows
CustomerDim: 4389 rows
StoreDim: 38 rows
ProductDim: 18053 rows
TimeDim: 305 rows


# *T-Transform*
### *Transformation Strategy*

*Following the full extraction, we proceeded with a`* ***Full Transformation*** *approach. Each dataset was fully inspected and cleaned independently to ensure data quality and consistency before merging.*


In [17]:
def transform(df):
    logging.info("Starting transformation")
    # Filter out invalid sales: Quantity < 0 or UnitPrice <= 0
    df = df[(df['Quantity'] > 0) & (df['UnitPrice'] > 0)]

    # Calculate TotalSales
    df['TotalSales'] = df['Quantity'] * df['UnitPrice']

    # Filter for sales in the last year (relative to 2025-08-12)
    cutoff_date = pd.Timestamp('2024-08-12')
    df = df[df['InvoiceDate'] >= cutoff_date]

    logging.info(f"Transformed data has {len(df)} rows after filtering")
    return df

In [18]:
# Transform the extracted FactSales DataFrame
fact_sales_transformed = transform(dataframes["FactSales"])

# Check result
print(fact_sales_transformed.head())
print(f"Total rows after transformation: {len(fact_sales_transformed)}")


2025-08-13 12:37:25,629 - INFO - Starting transformation
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['TotalSales'] = df['Quantity'] * df['UnitPrice']
2025-08-13 12:37:25,849 - INFO - Transformed data has 530104 rows after filtering


  InvoiceNo InvoiceDate    TimeID ProductID  CustomerID  StoreID  Quantity  \
0    536365  2024-12-01  20241201    85123A     17850.0        1         6   
1    536365  2024-12-01  20241201     71053     17850.0        1         6   
2    536365  2024-12-01  20241201    84406B     17850.0        1         8   
3    536365  2024-12-01  20241201    84029G     17850.0        1         6   
4    536365  2024-12-01  20241201    84029E     17850.0        1         6   

   UnitPrice  Discount  TotalSales  
0       2.55         0       15.30  
1       3.39         0       20.34  
2       2.75         0       22.00  
3       3.39         0       20.34  
4       3.39         0       20.34  
Total rows after transformation: 530104


# *Load to SQLite*

In [19]:
import sqlite3

def load_to_db(dataframes, db_path='retail_dw.db'):
    """
    Load DataFrames into SQLite database.

    Parameters:
        dataframes (dict): Dictionary of DataFrames to load {table_name: df}
        db_path (str): Full path to SQLite database
    """
    try:
        conn = sqlite3.connect(db_path)
        logging.info(f"Connected to database: {db_path}")

        for table_name, df in dataframes.items():
            if df.empty:
                logging.warning(f"{table_name} is empty, skipping.")
                continue

            df.to_sql(table_name, conn, if_exists='replace', index=False)
            logging.info(f"Loaded {len(df)} rows into {table_name}")

        conn.commit()
        logging.info("All tables loaded and committed successfully.")

    except Exception as e:
        logging.error(f"Error loading data into database: {e}")

    finally:
        conn.close()
        logging.info("Database connection closed.")

    return db_path

# Example usage
# Make sure to include transformed FactSales
dataframes_to_load = dataframes.copy()
dataframes_to_load['FactSales'] = fact_sales_transformed  # Replace with transformed

db_path = r"C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1\Task_2_ETL_Process_Implementation\retail_dw.db"

load_to_db(dataframes_to_load, db_path=db_path)


2025-08-13 12:37:25,910 - INFO - Connected to database: C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1\Task_2_ETL_Process_Implementation\retail_dw.db


2025-08-13 12:37:29,310 - INFO - Loaded 530104 rows into FactSales
2025-08-13 12:37:29,353 - INFO - Loaded 4389 rows into CustomerDim
2025-08-13 12:37:29,383 - INFO - Loaded 38 rows into StoreDim
2025-08-13 12:37:29,475 - INFO - Loaded 18053 rows into ProductDim
2025-08-13 12:37:29,498 - INFO - Loaded 305 rows into TimeDim
2025-08-13 12:37:29,500 - INFO - All tables loaded and committed successfully.
2025-08-13 12:37:29,504 - INFO - Database connection closed.


'C:\\Users\\Snit Kahsay\\Desktop\\DSA-2040_Practical_Exam_SnitTeshome552\\Section_1\\Task_2_ETL_Process_Implementation\\retail_dw.db'

#### *Loaing using Parquet*

In [20]:
# Source folder where Task 2 Parquet files are saved
task2_folder = r"C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1 Task_2_ETL_Process_Implementation"

# Single Loaded_data folder (from Task 1) to store all Parquet files
loaded_data_folder = r"C:\Users\Snit Kahsay\Desktop\DSA-2040_Practical_Exam_SnitTeshome552\Section_1\Task_1_Data_Warehouse_Design\Loaded_data"

# Ensure the folder exists
os.makedirs(loaded_data_folder, exist_ok=True)

# Copy Task 2 Parquet files into the single Loaded_data folder
for file in os.listdir(task2_folder):
    if file.endswith('.parquet'):
        src_path = os.path.join(task2_folder, file)
        dst_path = os.path.join(loaded_data_folder, file)
        # Avoid overwriting existing files
        if not os.path.exists(dst_path):
            shutil.copy(src_path, dst_path)
        else:
            print(f"{file} already exists in Loaded_data folder, skipping copy.")

# Load all Parquet files from the single Loaded_data folder
dataframes = {}
for file in os.listdir(loaded_data_folder):
    if file.endswith('.parquet'):
        parquet_path = os.path.join(loaded_data_folder, file)
        df_name = file.replace('.parquet', '')
        try:
            df = pd.read_parquet(parquet_path)
            dataframes[df_name] = df
            print(f"Loaded {file} successfully.")
        except Exception as e:
            print(f"Error loading {file}: {e}")
