# ETL Pipeline Documentation


## Overview

This document describes the Extract, Transform, Load (ETL) process for merging and transforming sales data from EU and US regions. The goal is to create a unified dataset with consistent formatting and structure.


### Importing the necessary libraries

In [128]:
import pandas as pd
from sqlalchemy import create_engine
import psycopg2
import os
from dotenv import load_dotenv

## Extract

### Load the Datasets paths and the db database connection 

In [129]:
# Load the .env file
load_dotenv()
DATABASE_URL = os.getenv('DATABASE_URL')
eu_sales_file_path = os.getenv('eu_sales_file_path')
us_sales_file_path = os.getenv('us_sales_file_path')

### Load the Datasets
The first step is to load the datasets from CSV files into pandas DataFrames.

In [130]:
# Load the datasets
eu_sales = pd.read_csv(eu_sales_file_path, encoding='iso-8859-1')
us_sales = pd.read_csv(us_sales_file_path, encoding='iso-8859-1')

Display the first few rows of each dataset


In [131]:
eu_sales.head()

Unnamed: 0,Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,City,State/Province,Country/Region,Region,Product ID,Category,Sub-Category,Product Name,Sales,Quantity,Discount,Profit
0,1,ES-2021-1311038,2/7/2021,2/11/2021,Standard Class,AS-10045,Aaron Smayling,Corporate,Leeds,England,United Kingdom,North,OFF-ST-10000988,Office Supplies,Storage,"Fellowes Folders, Blue",79.2,3,0.0,39.6
1,2,ES-2021-1311038,2/7/2021,2/11/2021,Standard Class,AS-10045,Aaron Smayling,Corporate,Leeds,England,United Kingdom,North,TEC-AC-10004144,Technology,Accessories,"SanDisk Numeric Keypad, Bluetooth",388.92,7,0.0,0.0
2,3,ES-2021-1311038,2/7/2021,2/11/2021,Standard Class,AS-10045,Aaron Smayling,Corporate,Leeds,England,United Kingdom,North,OFF-LA-10001915,Office Supplies,Labels,"Avery Legal Exhibit Labels, 5000 Label Set",35.19,3,0.0,16.11
3,4,ES-2021-1311038,2/7/2021,2/11/2021,Standard Class,AS-10045,Aaron Smayling,Corporate,Leeds,England,United Kingdom,North,OFF-ST-10004550,Office Supplies,Storage,"Fellowes Folders, Wire Frame",50.94,2,0.0,13.2
4,5,ES-2021-1311038,2/7/2021,2/11/2021,Standard Class,AS-10045,Aaron Smayling,Corporate,Leeds,England,United Kingdom,North,TEC-AC-10004068,Technology,Accessories,"Memorex Memory Card, USB",307.44,3,0.0,73.71


In [132]:
us_sales.head()

Unnamed: 0,Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,Country/Region,City,...,Postal Code,Region,Product ID,Category,Sub-Category,Product Name,Sales,Quantity,Discount,Profit
0,1,CA-2021-152156,11/8/2021,11/11/2021,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,...,42420.0,South,FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase,261.96,2,0.0,41.9136
1,2,CA-2021-152156,11/8/2021,11/11/2021,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,...,42420.0,South,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs,...",731.94,3,0.0,219.582
2,3,CA-2021-138688,6/12/2021,6/16/2021,Second Class,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles,...,90036.0,West,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters b...,14.62,2,0.0,6.8714
3,4,US-2020-108966,10/11/2020,10/18/2020,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,...,33311.0,South,FUR-TA-10000577,Furniture,Tables,Bretford CR4500 Series Slim Rectangular Table,957.5775,5,0.45,-383.031
4,5,US-2020-108966,10/11/2020,10/18/2020,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,...,33311.0,South,OFF-ST-10000760,Office Supplies,Storage,Eldon Fold 'N Roll Cart System,22.368,2,0.2,2.5164


In [133]:
us_sales.columns

Index(['Row ID', 'Order ID', 'Order Date', 'Ship Date', 'Ship Mode',
       'Customer ID', 'Customer Name', 'Segment', 'Country/Region', 'City',
       'State', 'Postal Code', 'Region', 'Product ID', 'Category',
       'Sub-Category', 'Product Name', 'Sales', 'Quantity', 'Discount',
       'Profit'],
      dtype='object')

## Transform

### Split Columns

The order ID, customer ID, and product ID columns in both datasets have extra information. While this data could be useful in a Snowflake schema, we're going to use a star schema for our Power BI dashboard. We'll extract just the IDs from these columns and drop the extra parts, as they aren't relevant to what we're doing.

In [134]:
# Split 'Order ID' column into multiple columns and keep only the 'OrderID' part
column_to_split = 'Order ID'  
delimiter = '-'  
new_column_names = ['co_code', 'year_part', 'OrderID']  

us_sales[new_column_names] = us_sales[column_to_split].str.split(delimiter, expand=True)
eu_sales[new_column_names] = eu_sales[column_to_split].str.split(delimiter, expand=True)

In [135]:
# Split 'Customer ID' column into multiple columns and keep only the 'CustomerID' part
column_to_split = 'Customer ID' 
delimiter = '-'  
new_column_names = ['cu_int', 'CustomerID']  

us_sales[new_column_names] = us_sales[column_to_split].str.split(delimiter, expand=True)
eu_sales[new_column_names] = eu_sales[column_to_split].str.split(delimiter, expand=True)

In [136]:
# Split 'Product ID' column into multiple columns and keep only the 'ProductID' part
column_to_split = 'Product ID'  
delimiter = '-'  
new_column_names = ['category_id', 'subcategory_id', 'ProductID']  

us_sales[new_column_names] = us_sales[column_to_split].str.split(delimiter, expand=True)
eu_sales[new_column_names] = eu_sales[column_to_split].str.split(delimiter, expand=True)

### Drop Unnecessary Columns


Now to remove the unnecessary columns and the Postal Code column, as it's not needed for our analysis.

In [137]:
us_sales.drop(columns=['category_id', 'subcategory_id','cu_int','co_code','year_part','Postal Code','Order ID','Customer ID','Product ID'], inplace=True)

In [138]:
eu_sales.drop(columns=['category_id', 'subcategory_id','cu_int','co_code','year_part','Order ID','Customer ID','Product ID'], inplace=True)

### Rename Columns

Since the data is clean from the source, we don't need to clean it up. We'll rename the column names for consistency and ease of use.

In [139]:
# Rename columns to match the database schema
us_sales.rename(columns={
    'Row ID': 'transaction_id',
    'Order Date': 'order_date',
    'Ship Date': 'ship_date',
    'Ship Mode': 'ship_mode',
    'Customer Name': 'customer_name',
    'Segment': 'segment',
    'Country/Region': 'country',
    'City': 'city',
    'State': 'state',
    'Region': 'region',
    'Category': 'category',
    'Sub-Category': 'subcategory',
    'Product Name': 'product_name',
    'Sales': 'sales',
    'Quantity': 'quantity',
    'Discount': 'discount',
    'Profit': 'profit',
    'OrderID': 'order_id',
    'CustomerID': 'customer_id',
    'ProductID': 'product_id'
}, inplace=True)


In [140]:
# Rename columns to match the database schema for the eu_sales DataFrame
eu_sales.rename(columns={
    'Row ID': 'transaction_id',
    'Order Date': 'order_date',
    'Ship Date': 'ship_date',
    'Ship Mode': 'ship_mode',
    'Customer Name': 'customer_name',
    'Segment': 'segment',
    'City': 'city',
    'State/Province': 'state',
    'Country/Region': 'country',
    'Region': 'region',
    'Category': 'category',
    'Sub-Category': 'subcategory',
    'Product Name': 'product_name',
    'Sales': 'sales',
    'Quantity': 'quantity',
    'Discount': 'discount',
    'Profit': 'profit',
    'OrderID': 'order_id',
    'CustomerID': 'customer_id',
    'ProductID': 'product_id'
}, inplace=True)


### Prefix Identifiers

Prefix TransactionID, CustomerID and Region with country-specific identifiers.

In [141]:
# Prefix customer IDs with country-specific identifiers
us_sales['transaction_id'] = 'US_' + us_sales['transaction_id'].astype(str)
eu_sales['transaction_id'] = 'EU_' + eu_sales['transaction_id'].astype(str)

In [142]:
# Prefix customer IDs with country-specific identifiers
us_sales['customer_id'] = 'US_' + us_sales['customer_id'].astype(str)
eu_sales['customer_id'] = 'EU_' + eu_sales['customer_id'].astype(str)

In [143]:
# Prefix customer IDs with country-specific identifiers
us_sales['region'] = 'US_' + us_sales['region'].astype(str)
eu_sales['region'] = 'EU_' + eu_sales['region'].astype(str)

### Combine Datasets

In [144]:
eu_us_sales = pd.concat([eu_sales, us_sales], axis=0)

In [145]:
eu_us_sales['order_id'].nunique() == us_sales['order_id'].nunique() + eu_sales['order_id'].nunique()

True

### Create Dimension Tables

Now that we've combined the US and EU data, let's create the dimension tables for our dataset

1. Customer Dimension
Columns: Customer ID, Customer Name, City,State,Country,Segment
2. Product Dimension
Columns: Product ID, Product Name,Category,Subcategory
3. order date Dimension
Columns: Date,Year,Quarter,Month,MonthName,Day,DayOfWeek,DayName
4. ship date Dimension
Columns: Date,Year,Quarter,Month,MonthName,Day,DayOfWeek,DayName


In [146]:
# Create dimension tables
customer_dim = eu_us_sales[['customer_id', 'customer_name', 'segment', 'country', 'city', 'state', 'region']]
product_dim = eu_us_sales[['product_id', 'category', 'subcategory', 'product_name']]

# Drop duplicates based on the 'customer_id' column in customer_dim
customer_dim = customer_dim.drop_duplicates(subset='customer_id')

# Drop duplicates based on the 'product_id' column in product_dim
product_dim = product_dim.drop_duplicates(subset='product_id')

In [168]:
# For order_date_dim
eu_us_sales['order_date'] = pd.to_datetime(eu_us_sales['order_date'])
start_date = eu_us_sales['order_date'].min().date()
end_date = eu_us_sales['order_date'].max().date()

# Add one month to the start and end dates
start_date = start_date + pd.DateOffset(months=2)
end_date = end_date + pd.DateOffset(months=2)

order_date_dim = pd.DataFrame({
    'order_date': pd.date_range(start=start_date, end=end_date, freq='D')
})
order_date_dim['year'] = order_date_dim['order_date'].dt.year
order_date_dim['quarter'] = order_date_dim['order_date'].dt.quarter
order_date_dim['month'] = order_date_dim['order_date'].dt.month
order_date_dim['month_name'] = order_date_dim['order_date'].dt.strftime('%B')
order_date_dim['day'] = order_date_dim['order_date'].dt.day
order_date_dim['day_name'] = order_date_dim['order_date'].dt.strftime('%A')

In [169]:
# For ship_date_dim
eu_us_sales['ship_date'] = pd.to_datetime(eu_us_sales['ship_date'])
start_date = eu_us_sales['ship_date'].min().date()
end_date = eu_us_sales['ship_date'].max().date()

# Adding one month to the start and end dates
start_date = start_date + pd.DateOffset(months=2)
end_date = end_date + pd.DateOffset(months=2)

ship_date_dim = pd.DataFrame({
    'ship_date': pd.date_range(start=start_date, end=end_date, freq='D')
})
ship_date_dim['year'] = ship_date_dim['ship_date'].dt.year
ship_date_dim['quarter'] = ship_date_dim['ship_date'].dt.quarter
ship_date_dim['month'] = ship_date_dim['ship_date'].dt.month
ship_date_dim['month_name'] = ship_date_dim['ship_date'].dt.strftime('%B')
ship_date_dim['day'] = ship_date_dim['ship_date'].dt.day
ship_date_dim['day_name'] = ship_date_dim['ship_date'].dt.strftime('%A')

### Create Fact Table

Now that we've created the dimension tables, let's build the fact table. It will include the transaction ID, order ID, order date, ship date, customer ID, product ID, sales, quantity, discount, profit, and shipping method.

In [149]:
sales_fact = eu_us_sales[['transaction_id','order_id','order_date','ship_date','ship_mode','customer_id','product_id','sales', 'quantity', 'discount', 'profit']]

In [150]:
sales_fact.head()

Unnamed: 0,transaction_id,order_id,order_date,ship_date,ship_mode,customer_id,product_id,sales,quantity,discount,profit
0,EU_1,1311038,2021-02-07,2021-02-11,Standard Class,EU_10045,10000988,79.2,3,0.0,39.6
1,EU_2,1311038,2021-02-07,2021-02-11,Standard Class,EU_10045,10004144,388.92,7,0.0,0.0
2,EU_3,1311038,2021-02-07,2021-02-11,Standard Class,EU_10045,10001915,35.19,3,0.0,16.11
3,EU_4,1311038,2021-02-07,2021-02-11,Standard Class,EU_10045,10004550,50.94,2,0.0,13.2
4,EU_5,1311038,2021-02-07,2021-02-11,Standard Class,EU_10045,10004068,307.44,3,0.0,73.71


### We'll check the data types of all the columns and make any necessary adjustments.

### Display the Dimension and Fact Tables

In [151]:
customer_dim.head()

Unnamed: 0,customer_id,customer_name,segment,country,city,state,region
0,EU_10045,Aaron Smayling,Corporate,United Kingdom,Leeds,England,EU_North
7,EU_13840,Ellis Ballard,Corporate,United Kingdom,West Bromwich,England,EU_North
10,EU_17155,Logan Haushalter,Consumer,France,Le Bouscat,Aquitaine-Limousin-Poitou-Charentes,EU_Central
15,EU_16540,Kelly Collister,Consumer,France,Strasbourg,Alsace-Champagne-Ardenne-Lorraine,EU_Central
18,EU_15340,Jasper Cacioppo,Consumer,Germany,Aachen,North Rhine-Westphalia,EU_Central


In [152]:
product_dim.head()

Unnamed: 0,product_id,category,subcategory,product_name
0,10000988,Office Supplies,Storage,"Fellowes Folders, Blue"
1,10004144,Technology,Accessories,"SanDisk Numeric Keypad, Bluetooth"
2,10001915,Office Supplies,Labels,"Avery Legal Exhibit Labels, 5000 Label Set"
3,10004550,Office Supplies,Storage,"Fellowes Folders, Wire Frame"
4,10004068,Technology,Accessories,"Memorex Memory Card, USB"


In [153]:
order_date_dim.head()

Unnamed: 0,order_date,year,quarter,month,month_name,day,day_of_week,day_name
0,2018-02-01,2018,1,2,February,1,3,Thursday
1,2018-02-02,2018,1,2,February,2,4,Friday
2,2018-02-03,2018,1,2,February,3,5,Saturday
3,2018-02-04,2018,1,2,February,4,6,Sunday
4,2018-02-05,2018,1,2,February,5,0,Monday


In [154]:
ship_date_dim.head()

Unnamed: 0,ship_date,year,quarter,month,month_name,day,day_of_week,day_name
0,2018-02-05,2018,1,2,February,5,0,Monday
1,2018-02-06,2018,1,2,February,6,1,Tuesday
2,2018-02-07,2018,1,2,February,7,2,Wednesday
3,2018-02-08,2018,1,2,February,8,3,Thursday
4,2018-02-09,2018,1,2,February,9,4,Friday


While uploading the data to the database, we encountered errors. The following code addresses these issues.

In [188]:
# Step 1: Check for missing order_date values
missing_order_dates = sales_fact[~sales_fact['order_date'].isin(order_date_dim['order_date'])]['order_date'].unique()

# Step 2: Add missing dates to order_date_dim
if len(missing_order_dates) > 0:
    missing_dates_df = pd.DataFrame({'order_date': pd.to_datetime(missing_order_dates)})
    missing_dates_df['year'] = missing_dates_df['order_date'].dt.year
    missing_dates_df['quarter'] = missing_dates_df['order_date'].dt.quarter
    missing_dates_df['month'] = missing_dates_df['order_date'].dt.month
    missing_dates_df['month_name'] = missing_dates_df['order_date'].dt.strftime('%B')
    missing_dates_df['day'] = missing_dates_df['order_date'].dt.day
    missing_dates_df['day_name'] = missing_dates_df['order_date'].dt.strftime('%A')
    
    # Append missing dates to order_date_dim
    order_date_dim = pd.concat([order_date_dim, missing_dates_df], ignore_index=True)

# Ensure no duplicates in order_date_dim
order_date_dim = order_date_dim.drop_duplicates(subset='order_date')

In [189]:
# Step 1: Check for missing ship_date values
missing_ship_dates = sales_fact[~sales_fact['ship_date'].isin(ship_date_dim['ship_date'])]['ship_date'].unique()

# Step 2: Add missing dates to ship_date_dim
if len(missing_ship_dates) > 0:
    missing_dates_df = pd.DataFrame({'ship_date': pd.to_datetime(missing_ship_dates)})
    missing_dates_df['year'] = missing_dates_df['ship_date'].dt.year
    missing_dates_df['quarter'] = missing_dates_df['ship_date'].dt.quarter
    missing_dates_df['month'] = missing_dates_df['ship_date'].dt.month
    missing_dates_df['month_name'] = missing_dates_df['ship_date'].dt.strftime('%B')
    missing_dates_df['day'] = missing_dates_df['ship_date'].dt.day
    missing_dates_df['day_name'] = missing_dates_df['ship_date'].dt.strftime('%A')
    
    # Append missing dates to ship_date_dim
    ship_date_dim = pd.concat([ship_date_dim, missing_dates_df], ignore_index=True)

# Ensure no duplicates in ship_date_dim
ship_date_dim = ship_date_dim.drop_duplicates(subset='ship_date')

## LOAD

We'll load the dataset into a PostgreSQL database that we've already set up. We've also designed the schema.

In [155]:
#creating the db engine instance
engine = create_engine(DATABASE_URL)

In [182]:
try:
    order_date_dim.to_sql('order_date_dim', engine, if_exists='append', index=False)
    print(f"Upload complete. Rows attempted: {len(order_date_dim)}")
except Exception as e:
    print(f"An error occurred: {e}")

Upload complete. Rows attempted: 1866


In [183]:
try:
    ship_date_dim.to_sql('ship_date_dim', engine, if_exists='append', index=False)
    print(f"Upload complete. Rows attempted: {len(ship_date_dim)}")
except Exception as e:
    print(f"An error occurred: {e}")

Upload complete. Rows attempted: 1872


In [164]:
try:
    product_dim.to_sql('product_dim', engine, if_exists='append', index=False)
    print(f"Upload complete. Rows attempted: {len(product_dim)}")
except Exception as e:
    print(f"An error occurred: {e}")

Upload complete. Rows attempted: 2682


In [165]:
try:
    customer_dim.to_sql('customer_dim', engine, if_exists='append', index=False)
    print(f"Upload complete. Rows attempted: {len(customer_dim)}")
except Exception as e:
    print(f"An error occurred: {e}")

Upload complete. Rows attempted: 1588


In [184]:
try:
    sales_fact.to_sql('sales_fact', engine, if_exists='append', index=False)
    print(f"Upload complete. Rows attempted: {len(sales_fact)}")
except Exception as e:
    print(f"An error occurred: {e}")

Upload complete. Rows attempted: 19994
