# MySQL ETL project

> This project focuses on creating an end-to-end ETL pipeline, starting with dataset acquisition, data modeling, and storage in relational databases and object storage. We develop a dimensional model, automate ETL jobs, set up a data warehouse, and conduct advanced data analysis. The goal is to enable data-driven decision-making through a structured and comprehensive ETL process.

> This dataset amazon.csv is having the data of 1K+ Amazon Product's Ratings and Reviews as per their details listed on the official website of Amazon




## Step 1: Create Data Model
> Creating a data model entails defining tables, relationships, and constraints. Normalize data, design an ERD, document thoroughly, test, secure, and maintain through backups and version control.

> See amazon_sales_DB_model for more details

## Step 2: Data ETL
>  Ensure data quality by cleaning, addressing missing values, and outlier handling. De-normalize for efficiency, validate data types, and perform quality checks for reliable analysis.


In [1]:
# import required libraries
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

In [2]:
# read the file using pandas
df = pd.read_csv('amazon.csv', encoding='utf-8')

# Print the header
#print(df.columns.tolist())
#print(df.head(5))

# add product_ind INT to be primary key for efficient
df = df.drop_duplicates(subset=['product_id'])
df.reset_index(drop=True, inplace=True)
df.loc[:,'product_ind'] = df.index + 1
df = df[['product_ind'] + [col for col in df.columns if col != 'product_ind']]

print(df.columns.tolist())
print(df.shape)

['product_ind', 'product_id', 'product_name', 'category', 'discounted_price', 'actual_price', 'discount_percentage', 'rating', 'rating_count', 'about_product', 'user_id', 'user_name', 'review_id', 'review_title', 'review_content', 'img_link', 'product_link']
(1351, 17)


In [3]:
'''
table 1 : product table, primary key = product_ind

1. add product_ind as int for efficient
2. change to proper data types

'''
product = df[['product_ind','product_id', 'product_name', 'discounted_price', 'actual_price', 'discount_percentage','about_product', 'img_link', 'product_link']]
product = product.drop_duplicates(subset=['product_ind'])
product.reset_index(drop=True, inplace=True)

product['product_id'] = product['product_id'].astype(str)
product['discounted_price'] = product['discounted_price'].str.replace('[^0-9]', '', regex=True)
product['discounted_price'] = product['discounted_price'].astype(int)
product['actual_price'] = product['actual_price'].str.replace('[^0-9]', '', regex=True)
product['actual_price'] = product['actual_price'].str.replace(',', '')
product['actual_price'] = product['actual_price'].astype(int)
product['discount_percentage'] = product['discount_percentage'].str.rstrip('%').astype(float) / 100

print(product)

      product_ind  product_id  \
0               1  B07JW9H4J1   
1               2  B098NS6PVG   
2               3  B096MSW6CT   
3               4  B08HDJ86NZ   
4               5  B08CF3B7N1   
...           ...         ...   
1346         1347  B08L7J3T31   
1347         1348  B01M6453MB   
1348         1349  B009P2LIL4   
1349         1350  B00J5DYCCA   
1350         1351  B01486F4G6   

                                           product_name  discounted_price  \
0     Wayona Nylon Braided USB to Lightning Fast Cha...               399   
1     Ambrane Unbreakable 60W / 3A Fast Charging 1.5...               199   
2     Sounce Fast Phone Charging Cable & Data Sync U...               199   
3     boAt Deuce USB 300 2 in 1 Type-C & Micro USB S...               329   
4     Portronics Konnect L 1.2M Fast Charging 3A 8 P...               154   
...                                                 ...               ...   
1346  Noir Aqua - 5pcs PP Spun Filter + 1 Spanner | ...         

In [4]:
'''
table 2 : rating table, primary key = rating_id, foreign key = product_ind

1. add rating_id to be primary key
2. change to proper data types
'''
rating = df[['rating', 'rating_count', 'product_ind']]
rating.loc[:,'rating_id'] = rating.index + 1
rating = rating[['rating_id'] + [col for col in rating.columns if col != 'rating_id']]
rating = rating.drop_duplicates(subset=['product_ind'])
rating.reset_index(drop=True, inplace=True)

rating['rating'] = rating['rating'].str.replace('|', '')
rating['rating'] = rating['rating'].apply(lambda x: float(x) if x else 0.0)
rating['rating'] = rating['rating'].astype(float)
rating['rating_count'] = rating['rating_count'].replace([np.inf, -np.inf, np.nan], 0)
rating['rating_count'] = rating['rating_count'].astype(str)
rating['rating_count'] = rating['rating_count'].str.replace(',', '')
rating['rating_count'] = rating['rating_count'].astype(int)

print(rating)

      rating_id  rating  rating_count  product_ind
0             1     4.2         24269            1
1             2     4.0         43994            2
2             3     3.9          7928            3
3             4     4.2         94363            4
4             5     4.2         16905            5
...         ...     ...           ...          ...
1346       1347     4.0          1090         1347
1347       1348     4.1          4118         1348
1348       1349     3.6           468         1349
1349       1350     4.0          8031         1350
1350       1351     4.3          6987         1351

[1351 rows x 4 columns]


In [5]:
'''
table 3 : user table, primary key = user_id, foreign key = product_ind

clean:
1. extract the user_id list and user_name list and create a single value for each row,
   every user_id match with user_name with corresponding product_id
''' 
user = df[['user_id', 'user_name', 'product_ind']]

# split the user_id and user_name by ','
user['user_id'] = user['user_id'].str.split(',')
user['user_name'] = user['user_name'].str.split(',')

# because len(user_id) != len(user_name) so take the max and drop the None value 
max_elements = max(user.apply(lambda row: max(len(row['user_id']), len(row['user_name'])), axis=1))

user['user_id'] = user['user_id'].apply(lambda x: x + [None] * (max_elements - len(x)))
user['user_name'] = user['user_name'].apply(lambda x: x + [None] * (max_elements - len(x)))

user = user.explode(list(['user_id', 'user_name'])).dropna()

user = user.drop_duplicates(subset=['user_id'])
user.reset_index(drop=True, inplace=True)

print(user)

                           user_id         user_name  product_ind
0     AG3D6O4STAQKAY2UVGEUV46KN35Q             Manav            1
1     AHMY5CWJMMK5BJRBBSNLYT3ONILA      Adarsh gupta            1
2     AHCTC6ULH4XB6YHDY6PCH2R772LQ           Sundeep            1
3     AGYHHIERNXKA6P5T7CZLXKVPT7IQ    S.Sayeed Ahmed            1
4     AG4OGOFWXJZTQ2HKYIOCOY3KXF2Q    jaspreet singh            1
...                            ...               ...          ...
9039  AFUDGN5MEXLKUULNTM7Y2G5P7TYA       Vikas Kahol         1351
9040  AHXCDNSXAESERITAFELQABFVNLCA           PARDEEP         1351
9041  AGRZD6CHLCUNOLMMIMIHUCG7PIFA  Anindya Pramanik         1351
9042  AEALVGXXIP46OZVXKRUXSDWZJMEA   Harshada Pimple         1351
9043  AGEFL3AY7YXEFZA4ZJU3LP7K7OJQ            Saw a.         1351

[9044 rows x 3 columns]


In [6]:
'''
table 4: review table, primary key = review_id, foreign key = product_ind

1. extract the review_id list, review_title list, and review_content list and create a single value for each row,
   every review_id match with review_title, and review_content with corresponding user_id and product_id
'''
review = df[['review_id', 'review_title', 'review_content', 'product_ind']]

review['review_id'] = review['review_id'].str.split(',')
review['review_title'] = review['review_title'].str.split(',')
review['review_content'] = review['review_content'].str.split(',')

max_elements = max(review.apply(lambda row: max(len(row['review_id']), len(row['review_title']), len(row['review_content'])), axis=1))

review['review_id'] = review['review_id'].apply(lambda x: x + [None] * (max_elements - len(x)))
review['review_title'] = review['review_title'].apply(lambda x: x + [None] * (max_elements - len(x)))
review['review_content'] = review['review_content'].apply(lambda x: x + [None] * (max_elements - len(x)))

review = review.explode(list(['review_id', 'review_title', 'review_content'])).dropna()
review = review.drop_duplicates(subset=['review_id'])
review.reset_index(drop=True, inplace=True)

print(review)

           review_id                                   review_title  \
0     R3HXWT0LRP0NMF                                      Satisfied   
1     R2AJM3LFTLZHFO                        Charging is really fast   
2       R6AQJGUP6P86                                Value for money   
3     R1KD19VHEDV0OR                                 Product review   
4     R3C02RMYQMK6FC                                   Good quality   
...              ...                                            ...   
9258  R2ZC03S4QXOW4Y                             Excellent product✌   
9259  R186H8YW34BQD5               A good product for household use   
9260  R10NC3D321N59G  मुझे बिल्कुल भी मजा नहीं आया और वापस कर दिया।   
9261   REKF75G4SOAOX                                   Best product   
9262  R2G0ZT4JQX322I                                           Good   

                                         review_content  product_ind  
0        Looks durable Charging is fine tooNo complains            1  
1    

In [7]:
'''
table 5: category table, primary key = category_id

1. extract the category list and create a single value for each row, every category with corresponding product_id
2. Add category_id to be primary key
'''
category = df[['category', 'product_ind']]

category['category'] = category['category'].str.split('|')
category = category.explode(['category'])


category = category.drop_duplicates(subset=['category'])
category.reset_index(drop=True, inplace=True)

category.loc[:,'category_id'] = category.index + 1
category = category[['category_id'] + [col for col in category.columns if col != 'category_id']]
category.drop(columns=['product_ind'], inplace=True)

print(category)

     category_id                 category
0              1    Computers&Accessories
1              2  Accessories&Peripherals
2              3       Cables&Accessories
3              4                   Cables
4              5                USBCables
..           ...                      ...
312          313              StandMixers
313          314             PedestalFans
314          315        VacuumAccessories
315          316               VacuumBags
316          317             HandheldBags

[317 rows x 2 columns]


In [8]:
'''
table 6: productCategory junction table, primary key = category_id and product_ind, foreign key = category_id and product_ind

1. create df_a for every possible category and product_id then merge with category table to get corresponding category_id
'''
df_a = df[['category', 'product_ind']]

df_a['category'] = df_a['category'].str.split('|')
df_a = df_a.explode(['category'])

productCategory = df_a.merge(category, on='category', how='inner')
productCategory.drop(columns=['category'], inplace=True)

print(productCategory)

      product_ind  category_id
0               1            1
1               2            1
2               3            1
3               4            1
4               5            1
...           ...          ...
5781         1319          313
5782         1326          314
5783         1331          315
5784         1331          316
5785         1331          317

[5786 rows x 2 columns]


## Step 3: Relational database 
> Use a Python connector to store pre-ETL tables, create tables, insert data, set primary and foreign keys, and manage errors for data integrity in your relational database.


In [9]:
# import required libraries
import mysql.connector as connector
from mysql.connector import Error
from sqlalchemy import create_engine


In [10]:
# establish a MySQL connection
connection = connector.connect(user="root",password="password")
cursor = connection.cursor()

In [11]:
# create database amazon_sales_db
create_db = 'CREATE DATABASE IF NOT EXISTS amazon_sales_db'
cursor.execute(create_db)
cursor.execute('USE amazon_sales_db')
db_url = 'mysql+pymysql://root:password@127.0.0.1/amazon_sales_db'
engine = create_engine(db_url)

In [None]:
# create table using the pandas 
# insert product table
try:
    # product.to_sql('product', con = engine, if_exists='replace', index=False)

    alter_product_table = """ ALTER TABLE product MODIFY product_ind INT NOT NULL, MODIFY product_id VARCHAR(255) NOT NULL, 
                                MODIFY product_name TEXT NOT NULL, MODIFY discounted_price INT NOT NULL, 
                                MODIFY actual_price INT NOT NULL, MODIFY discount_percentage DOUBLE NOT NULL, 
                                MODIFY about_product TEXT NOT NULL, MODIFY img_link VARCHAR(255) NOT NULL, 
                                MODIFY product_link VARCHAR(255) NOT NULL, ADD primary key (product_ind)"""

    cursor.execute(alter_product_table)

    
    connection.commit()
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

In [None]:
# insert rating table
try:
    rating.to_sql('rating', con = engine, if_exists='replace', index=False)

    alter_rating_table = """ ALTER TABLE rating MODIFY rating_id INT NOT NULL, MODIFY rating VARCHAR(255) NOT NULL,
                              MODIFY rating_count INT NOT NULL, MODIFY product_ind INT NOT NULL, ADD primary key (rating_id),
                              ADD foreign key (product_ind) references product(product_ind)"""
    cursor.execute(alter_rating_table)
    
    connection.commit()
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

In [None]:
# insert user table
try:
    user.to_sql('user', con = engine, if_exists='replace', index=False)

    alter_user_table = """ ALTER TABLE user MODIFY user_id VARCHAR(255) NOT NULL, MODIFY user_name VARCHAR(255) NOT NULL,
                              MODIFY product_ind INT NOT NULL, ADD primary key (user_id),
                              ADD foreign key (product_ind) references product(product_ind)"""
    cursor.execute(alter_user_table)

    connection.commit()
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

In [None]:
# insert review table
try:
    review.to_sql('review', con = engine, if_exists='replace', index=False)

    alter_review_table = """ ALTER TABLE review MODIFY review_id VARCHAR(255) NOT NULL, MODIFY review_title VARCHAR(255) NOT NULL,
                              MODIFY review_content TEXT NOT NULL, MODIFY product_ind INT NOT NULL, ADD primary key (review_id),
                              ADD foreign key (product_ind) references product(product_ind)"""
    cursor.execute(alter_review_table)

    connection.commit()
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

In [None]:
# insert category table
try:
    category.to_sql('category', con = engine, if_exists='replace', index=False)

    alter_category_table = """ ALTER TABLE category MODIFY category_id  INT NOT NULL, MODIFY category VARCHAR(255) NOT NULL,
                                ADD primary key (category_id)"""
    cursor.execute(alter_category_table)

    connection.commit()
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

In [None]:
# insert productCategory table
try:
    productCategory.to_sql('productCategory', con = engine, if_exists='replace', index=False)

    alter_productCategory_table = """ ALTER TABLE productCategory MODIFY product_ind INT NOT NULL, MODIFY category_id INT NOT NULL,
                                ADD primary key (product_ind, category_id), ADD foreign key (product_ind) references product(product_ind),
                                ADD foreign key (category_id) references category(category_id)"""
    cursor.execute(alter_productCategory_table)
    
    connection.commit()

except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

connection.close()

## Step 4: Object Storage: 
> Depending on the project's data volume, choose either local file storage or a GitHub repository for data storage.

## Step 5: Creating a Star Schema
> Implement a star schema to optimize queries, improve performance, and enhance flexibility.

> See amazon_sales_Star_schema for more details

## Step 6: Reading and Analyzing Data
> Retrieve data from MySQL and analyze the data to answer project-specific questions or achieve your objectives.

In [12]:
# establish a MySQL connection
connection = connector.connect(user="root",password="password")
cursor = connection.cursor()

In [13]:
# create database amazon_sales_db
create_db = 'CREATE DATABASE IF NOT EXISTS amazon_sales_db'
cursor.execute(create_db)
cursor.execute('USE amazon_sales_db')
db_url = 'mysql+pymysql://root:password@127.0.0.1/amazon_sales_db'
engine = create_engine(db_url)

#### 1. What are the top 10 products with the highest average ratings?

In [14]:
try:
    ans1 = ''' SELECT p.product_id, p.product_name, AVG(r.rating) AS avg_rating FROM product p JOIN rating r ON
            p.product_ind = r.product_ind GROUP BY p.product_id,p.product_name ORDER BY avg_rating DESC LIMIT 10; '''


    df_an1 = pd.read_sql(ans1, connection)
    print(df_an1)
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

   product_id                                       product_name  avg_rating
0  B0BP7XLX48  Syncwire LTG to USB Cable for Fast Charging Co...         5.0
1  B09ZHCJDP1  Amazon Basics Wireless Mouse | 2.4 GHz Connect...         5.0
2  B0BQRJ3C47  REDTECH USB-C to Lightning Cable 3.3FT, [Apple...         5.0
3  B0BQ3K23Y1  Oratech Coffee Frother electric, milk frother ...         4.8
4  B0B53DS4TF  Instant Pot Air Fryer, Vortex 2QT, Touch Contr...         4.8
5  B0BR4F878Q  Swiffer Instant Electric Water Heater Faucet T...         4.8
6  B0BM4KTNL1  FIGMENT Handheld Milk Frother Rechargeable, 3-...         4.7
7  B0BM9H2NY9  Multifunctional 2 in 1 Electric Egg Boiling St...         4.7
8  B09WN3SRC7  Sony Bravia 164 cm (65 inches) 4K Ultra HD Sma...         4.7
9  B0BP89YBC1  Campfire Spring Chef Prolix Instant Portable W...         4.7


#### 2. What is the average number of porduct revieded by users?

In [15]:
try:
    ans2 = ''' SELECT AVG(review_count) AS average_reviews_per_user FROM ( SELECT u.user_id, COUNT(DISTINCT r.review_id) AS
    review_count FROM user u LEFT JOIN review r ON u.product_ind = r.product_ind GROUP BY u.user_id) AS UserReviewCounts;'''

    cursor.execute(ans2)
    result = cursor.fetchall()
    column = cursor.column_names
    print(column)
    print(result)
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

('average_reviews_per_user',)
[(Decimal('7.9340'),)]


#### 3. Top 10 users who have reviewed products from multiple categories?

In [16]:
try:
    ans3 = ''' SELECT u.user_id, u.user_name, COUNT(DISTINCT c.category) AS reviewed_categories_count FROM user u LEFT JOIN review r 
                ON u.product_ind = r.product_ind LEFT JOIN productCategory pc ON r.product_ind = pc.product_ind LEFT JOIN category c
                ON pc.category_id = c.category_id GROUP BY u.user_id, u.user_name HAVING reviewed_categories_count > 1 ORDER BY reviewed_categories_count 
                DESC LIMIT 10'''


    df_an3 = pd.read_sql(ans3, connection)
    print(df_an3)
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

                        user_id           user_name  reviewed_categories_count
0  AH7Z4E2IL2MTD636EXSVY3BP6DYQ              Misoma                          7
1  AHAAYO56NUYMNL7O7P65YAAKV6TA           Sughnesha                          7
2  AHAPQUB4R4LW5DVZEUNCXW3PB5BQ         Nupur Gupta                          7
3  AHFQXP2EQZOT7NTNHK5TMB6QA5QQ          Harsh Shah                          7
4  AHH4X42X46T6KKBR7G3L7TCGV3OA              Neeraj                          7
5  AHJCN3WBIWAFPESBIGKPYRNWEXFA        Mini Cherian                          7
6  AHKCXIGVYQTE4LJIPBIFSQNYZSCQ  Harishankar Thakur                          7
7  AHNNUOE6JO6DYVSBCELNHU63SLTQ      Harini Jeyapal                          7
8  AHONFHGWU5UFOW2K622LL7B26M3Q       Nilofar Adeni                          7
9  AHSIVUNTJMI5S5AJGFDE5EDQ355Q               Pavan                          7


#### 4. Which category has the most products , and which has the least?

In [17]:
try:
    ans4 = ''' WITH cpc AS (SELECT pc.category_id, c.category, COUNT(DISTINCT pc.product_ind) AS product_count FROM productCategory pc 
                LEFT JOIN category c ON pc.category_id = c.category_id GROUP BY pc.category_id, c.category) SELECT category, product_count 
                FROM cpc WHERE product_count = (SELECT MAX(product_count) FROM cpc) OR product_count = ( SELECT MIN(product_count) FROM cpc)LIMIT 10;'''

    df_an4 = pd.read_sql(ans4, connection)
    print(df_an4)
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

                 category  product_count
0             Electronics            490
1      SpeakerAccessories              1
2               DVICables              1
3           SpeakerCables              1
4   MediaStreamingDevices              1
5        StreamingClients              1
6  AVReceivers&Amplifiers              1
7           TowerSpeakers              1
8               3DGlasses              1
9                 Tripods              1


#### 5. Which products belong to multiple categories, and how does that affect their sales and ratings?

In [18]:
try:
    ans5 = ''' SELECT p.product_id, p.product_name, COUNT(DISTINCT pc.category_id) AS category_count, SUM(p.actual_price) AS total_sales, 
                AVG(r.rating) AS avg_rating FROM product p LEFT JOIN productCategory pc ON p.product_ind = pc.product_ind LEFT JOIN rating r
                ON p.product_ind = r.product_ind GROUP BY p.product_id, p.product_name HAVING category_count > 1 ORDER BY total_sales DESC LIMIT 10;
            '''

    df_an5 = pd.read_sql(ans5, connection)
    print(df_an5)
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

   product_id                                       product_name  \
0  B08D9NDZ1Y  HP Deskjet 2331 Colour Printer, Scanner and Co...   
1  B09WN3SRC7  Sony Bravia 164 cm (65 inches) 4K Ultra HD Sma...   
2  B0B19VJXQZ  ECOVACS DEEBOT N8 2-in-1 Robotic Vacuum Cleane...   
3  B0BC8BQ432  VU 164 cm (65 inches) The GloLED Series 4K Sma...   
4  B08Y1SJVV5  pTron Solero MB301 3A Micro USB Data & Chargin...   
5  B0B3XXSB1K  LG 139 cm (55 inches) 4K Ultra HD Smart LED TV...   
6  B09NS5TKPN  LG 1.5 Ton 5 Star AI DUAL Inverter Split AC (C...   
7  B08VB57558  Samsung Galaxy S20 FE 5G (Cloud Navy, 8GB RAM,...   
8  B0B15GSPQW  Samsung 138 cm (55 inches) Crystal 4K Neo Seri...   
9  B095JPKPH3  OnePlus 163.8 cm (65 inches) U Series 4K LED S...   

   category_count  total_sales  avg_rating  
0               3    1299888.0         3.5  
1               4     559600.0         4.7  
2               6     359400.0         4.4  
3               4     340000.0         4.3  
4               5     3333

#### 6. Which products belong to multiple categories, and how does that affect their sales and ratings?

In [19]:
try:
    ans6 = ''' SELECT COALESCE(c.category, 'Total'), SUM(p.actual_price) AS category_revenue FROM category c LEFT JOIN productCategory pc 
                ON c.category_id = pc.category_id LEFT JOIN product p ON pc.product_ind = p.product_ind GROUP BY c.category WITH ROLLUP 
                ORDER BY category_revenue DESC LIMIT 10;
            '''

    df_an6 = pd.read_sql(ans6, connection)
    print(df_an6)
except Error as e:
    print("Error code: ", e.errno)
    print("Error message: ", e.msg)

connection.close()

  COALESCE(c.category, 'Total')  category_revenue
0                         Total        32484652.0
1                   Electronics         5104861.0
2          HomeTheater,TV&Video         2720394.0
3                   Televisions         2550458.0
4              SmartTelevisions         2458480.0
5                  Home&Kitchen         1864609.0
6           Mobiles&Accessories         1544685.0
7      Smartphones&BasicMobiles         1419754.0
8                   Smartphones         1400351.0
9         Computers&Accessories         1191617.0
