# ETL Pipeline for E-commerce Data

## Overview

This ETL (Extract, Transform, Load) process is for an **e-commerce dataset** comprising several key entities: Users, Products, Categories, Orders, Payments, Reviews, and OrderItems. The data was sourced from CSV files, cleaned, transformed, and loaded into a **MySQL relational database** to support further data analysis and reporting.

The **ETL pipeline** consists of 3 main stages:

1. **Extract:** Retrieve raw data from multiple CSV sources.
2. **Transform:** Clean, reformat, handle missing values, establish relationships via foreign keys, and ensure data consistency across tables.
3. **Load:** Insert the processed data into a MySQL database following the appropriate schema design.

📊 **Notebook Structure**

This notebook is organized into the following sections:

1. Extract
2. Transform
3. Load
4. **Challenges and Solutions:** Key issues encountered during the ETL process and how they were resolved.
5. **Takeaways:** Reflections and potential next steps for analysis or improvement.



# Extract

Imported datasets from several CSV files

In [2]:
!pip install mysql-connector-python

Collecting mysql-connector-python
  Downloading mysql_connector_python-9.3.0-cp313-cp313-win_amd64.whl.metadata (7.7 kB)
Downloading mysql_connector_python-9.3.0-cp313-cp313-win_amd64.whl (16.4 MB)
   ---------------------------------------- 0.0/16.4 MB ? eta -:--:--
    --------------------------------------- 0.3/16.4 MB ? eta -:--:--
   - -------------------------------------- 0.8/16.4 MB 2.9 MB/s eta 0:00:06
   --- ------------------------------------ 1.6/16.4 MB 3.1 MB/s eta 0:00:05
   ----- ---------------------------------- 2.4/16.4 MB 3.3 MB/s eta 0:00:05
   -------- ------------------------------- 3.4/16.4 MB 3.7 MB/s eta 0:00:04
   ---------- ----------------------------- 4.2/16.4 MB 3.9 MB/s eta 0:00:04
   ------------- -------------------------- 5.5/16.4 MB 4.2 MB/s eta 0:00:03
   ---------------- ----------------------- 6.8/16.4 MB 4.5 MB/s eta 0:00:03
   ------------------- -------------------- 8.1/16.4 MB 4.7 MB/s eta 0:00:02
   ----------------------- ---------------- 9.

In [177]:
import pandas as pd
import mysql.connector
import csv
import os
import re
from io import StringIO
from datetime import datetime

In [None]:
# Connect to MySQL db
conn = mysql.connector.connect(
    host='localhost',
    user='root',
    password='***',
    database='ecommerce_db'
)
cursor = conn.cursor()

In [178]:
# Extract data from CSV files

# users data
users_df = pd.read_csv('~/OneDrive/Documents/ecommerce_etl/users_data.csv',
    encoding='utf-8-sig',
    sep = ","
)

# users_df = pd.read_csv('~/OneDrive/Documents/ecommerce_etl/users_data.csv', header=None)
# users_df = users_df[0].str.split(',', expand=True)
# users_df.columns = ['UserName', 'Email', 'Password', 'Address', 'Extra']

# product data
products_df = pd.read_csv('~/OneDrive/Documents/ecommerce_etl/products_list.csv')

# category data
categories_df = pd.read_csv('~/OneDrive/Documents/ecommerce_etl/categories_data.csv', quotechar='"')

# order data
orders_df = pd.read_csv('~/OneDrive/Documents/ecommerce_etl/orders_data.csv')

# orderitem data
orderItems_df = pd.read_csv('~/OneDrive/Documents/ecommerce_etl/orderitems_data.csv')

# payment data
payments_df = pd.read_csv('~/OneDrive/Documents/ecommerce_etl/payments_data.csv')

# reviews_df = pd.read_csv(
#     '~/OneDrive/Documents/ecommerce_etl/reviews_data.csv',
#     sep=',',
#     quotechar='"',
#     encoding='utf-8'
# )

Check how the users data is parsed, because somehow tha comma in address data is considered as a delimiter 
|

In [179]:
file_path = os.path.expanduser('~/OneDrive/Documents/ecommerce_etl/users_data.csv')

with open(file_path, encoding='utf-8') as f:
    reader = csv.reader(f, delimiter=',', quotechar='"')
    for row in reader:
        print(repr(row))


['UserName', 'Email', 'Password', 'Address']
['Andi Pratama,andi.pratama@email.com,hashedpass001,"Jl. Merdeka No.10, Jakarta Pusat"']
['Siti Aminah,siti.aminah@email.com,hashedpass002,"Jl. Raya Bogor No.45, Depok"']
['Rizky Hidayat,rizky.hidayat@email.com,hashedpass003,"Jl. Ahmad Yani No.21, Bandung"']
['Dewi Kartika,dewi.kartika@email.com,hashedpass004,"Jl. Malioboro No.7, Yogyakarta"']
['Budi Santoso,budi.santoso@email.com,hashedpass005,"Jl. Diponegoro No.99, Surabaya"']


As seen from data above, there are 4 columns but tha data/records are divided into 5 columns because the comma in address data is considered as delimiter

In [180]:
file_path = os.path.expanduser('~/OneDrive/Documents/ecommerce_etl/users_data.csv')

def parse_line(line):
    reader = csv.reader(StringIO(line), delimiter=',', quotechar='"')
    return next(reader)

df_raw = pd.read_csv(file_path, header=None, encoding='utf-8-sig', engine='python')
parsed_rows = df_raw[0].apply(parse_line)
users_df = pd.DataFrame(parsed_rows.tolist(), columns=['UserName', 'Email', 'Password', 'Address'])

users_df = users_df[users_df['UserName'] != 'UserName'].reset_index(drop=True)
print(users_df)

        UserName                    Email       Password  \
0   Andi Pratama   andi.pratama@email.com  hashedpass001   
1    Siti Aminah    siti.aminah@email.com  hashedpass002   
2  Rizky Hidayat  rizky.hidayat@email.com  hashedpass003   
3   Dewi Kartika   dewi.kartika@email.com  hashedpass004   
4   Budi Santoso   budi.santoso@email.com  hashedpass005   

                            Address  
0  Jl. Merdeka No.10, Jakarta Pusat  
1       Jl. Raya Bogor No.45, Depok  
2     Jl. Ahmad Yani No.21, Bandung  
3    Jl. Malioboro No.7, Yogyakarta  
4    Jl. Diponegoro No.99, Surabaya  


Finally, the address data in users dataset is parsed correctly

The same problem occurs in the **Reviews and Categories DataFrames** because the ReviewText and Description columns contain commas. As a result, all data records appear in the first column, while the remaining columns contain **NaN values**.

**Below is the solution** I used to handle this issue.

In [181]:
file_path = os.path.expanduser('~/OneDrive/Documents/ecommerce_etl/reviews_data.csv')

with open(file_path, encoding='utf-8') as f:
    content = f.read()

# regex untuk memisahkan data per baris
records = re.findall(r'(\d+,\d+,\d+,".*?")(?=\s*\d+,\d+,\d+,"|$)', content, re.DOTALL)

header = ['ProductID', 'UserID', 'Rating', 'ReviewText']
parsed_rows = []

for record in records:
    reader = csv.reader(StringIO(record), delimiter=',', quotechar='"')
    parsed = next(reader)
    parsed = parsed[:4]
    parsed_rows.append(parsed)

reviews_df = pd.DataFrame(parsed_rows, columns=header)
print(reviews_df)


  ProductID UserID Rating                                         ReviewText
0         1      2      5  Laptop ini sangat cepat dan cocok untuk kerja ...
1         2      1      4                           Headphone nyaman dipakai
2         3      5      5  Smartwatch-nya membantu banget buat lacak olah...
3         4      3      4                                        Jaket keren
4         5      4      3                                       Kaos lumayan


In [182]:
raw_df = pd.read_csv('~/OneDrive/Documents/ecommerce_etl/categories_data.csv', header=None)

# Split column by first comma
categories_df = raw_df[0].str.split(',', n=1, expand=True)

# Rename column
categories_df.columns = ['CategoryName', 'Description']

# Remove " from description data
categories_df['Description'] = categories_df['Description'].str.strip('"')

# Drop header
categories_df = categories_df[categories_df['CategoryName'] != 'CategoryName'].reset_index(drop=True)

categories_df.head()


Unnamed: 0,CategoryName,Description
0,Electronics,"Produk elektronik seperti HP, laptop, smartwat..."
1,Fashion,"Pakaian, sepatu, tas, dan aksesoris untuk pria..."
2,Home Appliances,"Peralatan rumah tangga seperti blender, vacuum..."
3,Books,"Buku pendidikan, pengembangan diri, teknologi,..."


# Transform

Key transformation steps performed:

1. **Data Cleaning**
    - Removed NaN values and unnecessary headers (e.g., duplicate headers in CSV files).
    - Dropped rows with incomplete references, such as missing price in OrderItems.

2. **Data Type Conversion**


   Converted fields like Price, StockQuantity, Rating, and foreign keys (UserID, ProductID) to appropriate numeric types.

4. **Primary Key Generation**


   Assigned unique identifiers (Primary Keys) to each entity like UserID, ProductID, CategoryID, etc.

6. **Mapping & Normalization**
   -  Established correct foreign key relationships between tables:
    - Mapped ProductID to OrderItems and Reviews.
    - Mapped UserID to Orders and Reviews.
    - Mapped OrderID to Payments and OrderItems.

7. **Data Enrichment**
    - Extracted City from the Address field in the Users dataset.
    - Added DateRegistered for Users and DateAdded for Products using the current date.

8. **Calculating Aggregates**
    - Calculated TotalAmount for each order by summing up the total price of its OrderItems.
    - Assigned Amount in the Payments dataset based on the corresponding Order's TotalAmount.

9. **Standardizing Date and Rating Formats**
    - Standardized date fields to consistent formats (YYYY-MM-DD).
    - Ensured ratings are integers within a valid range (1 to 5).

In [183]:
# list dataframe to be analyzed
dataframes = {
    "Users": users_df,
    "Products": products_df,
    "Categories": categories_df,
    "Orders": orders_df,
    "OrderItems": orderItems_df,
    "Payments": payments_df,
    "Reviews": reviews_df
}

# Loop EDA per dataset
for name, df in dataframes.items():
    print(f"\n==================== {name} Dataset ====================")
    print("Shape:", df.shape)
    print("\n-- Sample Data:")
    print(df.head())

    print("\n-- Missing Values:")
    print(df.isnull().sum())

    print("\n-- Data Types:")
    print(df.dtypes)

    print("\n-- Descriptive Statistics (Numeric):")
    print(df.describe())


Shape: (5, 4)

-- Sample Data:
        UserName                    Email       Password  \
0   Andi Pratama   andi.pratama@email.com  hashedpass001   
1    Siti Aminah    siti.aminah@email.com  hashedpass002   
2  Rizky Hidayat  rizky.hidayat@email.com  hashedpass003   
3   Dewi Kartika   dewi.kartika@email.com  hashedpass004   
4   Budi Santoso   budi.santoso@email.com  hashedpass005   

                            Address  
0  Jl. Merdeka No.10, Jakarta Pusat  
1       Jl. Raya Bogor No.45, Depok  
2     Jl. Ahmad Yani No.21, Bandung  
3    Jl. Malioboro No.7, Yogyakarta  
4    Jl. Diponegoro No.99, Surabaya  

-- Missing Values:
UserName    0
Email       0
Password    0
Address     0
dtype: int64

-- Data Types:
UserName    object
Email       object
Password    object
Address     object
dtype: object

-- Descriptive Statistics (Numeric):
            UserName                   Email       Password  \
count              5                       5              5   
unique             5

In [184]:
# Function to generate current timestamp string
def current_timestamp():
    return datetime.now().strftime('%Y-%m-%d %H:%M:%S')

# ================== Generate Primary Keys ==================
users_df['UserID'] = range(1, len(users_df) + 1)
products_df['ProductID'] = range(1, len(products_df) + 1)
categories_df['CategoryID'] = range(1, len(categories_df) + 1)
orders_df['OrderID'] = range(1, len(orders_df) + 1)
orderItems_df['OrderItemID'] = range(1, len(orderItems_df) + 1)
payments_df['PaymentID'] = range(1, len(payments_df) + 1)
reviews_df['ReviewID'] = range(1, len(reviews_df) + 1)

# ================== USERS ==================
# Extract City from the 'Address' column
users_df['City'] = users_df['Address'].str.extract(r',\s*(\w+(?:\s\w+)*)$')

# Add 'DateRegistered' as today (date only, no time)
users_df['DateRegistered'] = pd.to_datetime('today').normalize()


# ================== PRODUCTS ==================
# Add today's date as 'DateAdded'
products_df['DateAdded'] = pd.to_datetime('today').normalize()


# ================== ORDERS ==================
# Merge OrderItems with Products to get 'Price' per ProductID
orderitems_with_price = orderItems_df.merge(products_df[['ProductID', 'Price']], on='ProductID', how='left')

# Remove rows where price is missing (to avoid NaN in calculations)
orderitems_with_price = orderitems_with_price.dropna(subset=['Price'])

# Calculate 'TotalPrice' per item (Quantity * Price)
orderitems_with_price['TotalPrice'] = orderitems_with_price['Quantity'] * orderitems_with_price['Price']

# Aggregate total order amount per OrderID
orders_total = orderitems_with_price.groupby('OrderID')['TotalPrice'].sum().reset_index().rename(columns={'TotalPrice': 'TotalAmount'})

# Merge calculated TotalAmount into orders
orders_df = orders_df.merge(orders_total, on='OrderID', how='left')
orders_df = orders_df.dropna()


# ================== ORDER ITEMS ==================
# Merge to get 'Price' from products for each OrderItem
orderItems_df = orderItems_df.merge(products_df[['ProductID', 'Price']], on='ProductID', how='left')

orderItems_df = orderItems_df.dropna()

# ================== REVIEWS ==================
# Ensure data types for foreign keys and rating
reviews_df['ProductID'] = reviews_df['ProductID'].astype(int)
reviews_df['UserID'] = reviews_df['UserID'].astype(int)
reviews_df['Rating'] = reviews_df['Rating'].astype(int)
reviews_df = reviews_df[(reviews_df['Rating'] >= 1) & (reviews_df['Rating'] <= 5)]


# ================== PAYMENTS ==================
# Merge to get the TotalAmount (renamed as Amount) from Orders
payments_df = payments_df.merge(orders_df[['OrderID', 'TotalAmount']], on='OrderID', how='left')
payments_df.rename(columns={'TotalAmount': 'Amount'}, inplace=True)
payments_df = payments_df.dropna()

In [185]:
# Drop primary key columns in each dataframe
users_df = users_df.drop(columns=['UserID'])
products_df = products_df.drop(columns=['ProductID'])
categories_df = categories_df.drop(columns=['CategoryID'])
orders_df = orders_df.drop(columns=['OrderID'])
orderItems_df = orderItems_df.drop(columns=['OrderItemID'])
payments_df = payments_df.drop(columns=['PaymentID'])
reviews_df = reviews_df.drop(columns=['ReviewID'])

### Final Check after Transformation

In [186]:
# list dataframe to be analyzed
dataframes = {
    "Users": users_df,
    "Products": products_df,
    "Categories": categories_df,
    "Orders": orders_df,
    "OrderItems": orderItems_df,
    "Payments": payments_df,
    "Reviews": reviews_df
}

# Loop EDA per dataset
for name, df in dataframes.items():
    print(f"\n==================== {name} Dataset ====================")
    print("Shape:", df.shape)
    print("\n-- Sample Data:")
    print(df.head())

    print("\n-- Missing Values:")
    print(df.isnull().sum())


Shape: (5, 6)

-- Sample Data:
        UserName                    Email       Password  \
0   Andi Pratama   andi.pratama@email.com  hashedpass001   
1    Siti Aminah    siti.aminah@email.com  hashedpass002   
2  Rizky Hidayat  rizky.hidayat@email.com  hashedpass003   
3   Dewi Kartika   dewi.kartika@email.com  hashedpass004   
4   Budi Santoso   budi.santoso@email.com  hashedpass005   

                            Address           City DateRegistered  
0  Jl. Merdeka No.10, Jakarta Pusat  Jakarta Pusat     2025-07-22  
1       Jl. Raya Bogor No.45, Depok          Depok     2025-07-22  
2     Jl. Ahmad Yani No.21, Bandung        Bandung     2025-07-22  
3    Jl. Malioboro No.7, Yogyakarta     Yogyakarta     2025-07-22  
4    Jl. Diponegoro No.99, Surabaya       Surabaya     2025-07-22  

-- Missing Values:
UserName          0
Email             0
Password          0
Address           0
City              0
DateRegistered    0
dtype: int64

Shape: (10, 6)

-- Sample Data:
           Pr

# LOAD

Load the transformed data into MySQL database

In [187]:
tables_to_clear = ['OrderItems', 'Payments', 'Reviews', 'Orders', 'Products', 'Users', 'Categories']

cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
for table in tables_to_clear:
    cursor.execute(f"DELETE FROM {table}")
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
conn.commit()

In [188]:
### ========== Load Categories ==========
for index, row in categories_df.iterrows():
    cursor.execute("""
        INSERT INTO Categories (CategoryName, Description)
        VALUES (%s, %s)
    """, (row['CategoryName'], row['Description']))
conn.commit()

### ========== Load Products ==========
product_id_mapping = {}
for index, row in products_df.iterrows():
    cursor.execute("""
        INSERT INTO Products (ProductName, Description, Price, StockQuantity, Category)
        VALUES (%s, %s, %s, %s, %s)
    """, (row['ProductName'], row['Description'], row['Price'], row['StockQuantity'], row['Category']))
    product_id_mapping[index] = cursor.lastrowid
conn.commit()

### ========== Load Users ==========
user_id_mapping = {}
for index, row in users_df.iterrows():
    cursor.execute("""
        INSERT INTO Users (UserName, Email, Address, Password)
        VALUES (%s, %s, %s, %s)
    """, (row['UserName'], row['Email'], row['Address'], row['Password']))
    user_id_mapping[index] = cursor.lastrowid
conn.commit()

### ========== Load Orders ==========
order_id_mapping = {}
for index, row in orders_df.iterrows():
    new_user_id = user_id_mapping.get(row['UserID'])
    cursor.execute("""
        INSERT INTO Orders (UserID, OrderDate, TotalAmount)
        VALUES (%s, %s, %s)
    """, (new_user_id, row['OrderDate'], row['TotalAmount']))
    order_id_mapping[index] = cursor.lastrowid
conn.commit()

### ========== Load OrderItems ==========
for index, row in orderItems_df.iterrows():
    new_order_id = order_id_mapping.get(row['OrderID'])
    new_product_id = product_id_mapping.get(row['ProductID'])
    cursor.execute("""
        INSERT INTO OrderItems (OrderID, ProductID, Quantity, Price)
        VALUES (%s, %s, %s, %s)
    """, (new_order_id, new_product_id, row['Quantity'], row['Price']))
conn.commit()

### ========== Load Payments ==========
for index, row in payments_df.iterrows():
    new_order_id = order_id_mapping.get(row['OrderID'])
    cursor.execute("""
        INSERT INTO Payments (OrderID, PaymentMethod, PaymentDate, Amount)
        VALUES (%s, %s, %s, %s)
    """, (new_order_id, row['PaymentMethod'], row['PaymentDate'], row['Amount']))
conn.commit()

### ========== Load Reviews ==========
for index, row in reviews_df.iterrows():
    new_product_id = product_id_mapping.get(row['ProductID'])
    new_user_id = user_id_mapping.get(row['UserID'])
    if pd.notnull(new_product_id) and pd.notnull(new_user_id):
        cursor.execute("""
            INSERT INTO Reviews (ProductID, UserID, Rating, ReviewText)
            VALUES (%s, %s, %s, %s)
        """, (new_product_id, new_user_id, row['Rating'], row['ReviewText']))
conn.commit()


# Challanges and Solutions

During the ETL process, several challenges emerged:

1. **Handling Embedded Commas in CSV Files**


   Some category descriptions contained commas, which caused incorrect column splits when reading the CSV.


   ✅ Solution: Applied proper handling using the quotechar parameter in pd.read_csv() and fallback parsing using string splitting when initial parsing failed.

3. **Foreign Key Constraints on Loading**


   Attempting to insert data into child tables (like Products or Orders) before parent tables (like Categories or Users) caused foreign key constraint errors.


   ✅ Solution: Carefully ordered the loading sequence—Categories first, then Products, Users, Orders, OrderItems, Payments, and Reviews—to respect dependencies.

5. **Resetting Auto-Increment IDs**


   After deleting data, MySQL did not automatically reset AUTO_INCREMENT counters, resulting in non-sequential IDs.


   ✅ Solution: Applied ALTER TABLE <table_name> AUTO_INCREMENT = 1 to reset ID counters after clearing data.

7. **Rating and Numeric Data Issues**


   Some rating values were improperly typed or out of the expected 1-5 range.


   ✅ Solution: Cast ratings to integers and filtered invalid ratings.

# Takeaways

1. **ETL Pipelines Require Careful Planning of Dependencies**

The strict order in which data must be loaded due to foreign key relationships is crucial. Mapping old IDs to new ones helps maintain referential integrity.

2. **Data Quality Upfront Minimizes Downstream Issues**

Initial data had formatting inconsistencies that if not handled early, it would propagate errors throughout the pipeline.

3. **Automation and Reusability**

Encapsulating transformation steps in reusable functions would streamline future iterations on similar datasets.