# Build an ETL Pipeline

step1 - Extract Data from Csv file

step2 - Transform the Data(i.e Clean the Data)- deal with missing and duplicate data

step3 - Create a Database

Step4 - Load the clean data into Database

In [224]:
# Import the libraries
import pandas as pd
import psycopg2
from datetime import datetime
from sqlalchemy import create_engine
import logging

In [225]:
# Logging setup
logging.basicConfig(filename="data_pipeline.log", level=logging.INFO)

In [226]:
# extract the data
Orders = pd.read_csv("F:\Silq\Orders .csv")
Products = pd.read_csv("F:\Silq\Products.csv")
Users = pd.read_csv("F:\\Silq\\Users.csv")

# EDA

# Orders

In [227]:
Orders.head()

Unnamed: 0,OrderID,UserID,ProductID,OrderDate,Amount
0,1,1,1,24-07-2022 12:37,199.96
1,2,8,2,22-04-2022 22:07,49.99
2,3,8,2,17-02-2022 11:15,199.96
3,4,10,3,01-05-2022 09:32,239.96
4,5,5,2,06-04-2022 09:58,99.95


In [228]:
Orders.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20 entries, 0 to 19
Data columns (total 5 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   OrderID    20 non-null     int64  
 1   UserID     20 non-null     int64  
 2   ProductID  20 non-null     int64  
 3   OrderDate  20 non-null     object 
 4   Amount     20 non-null     float64
dtypes: float64(1), int64(3), object(1)
memory usage: 932.0+ bytes


In [230]:
# change the datatype of Orders
Orders['OrderDate']=pd.to_datetime(Orders['OrderDate'])

In [231]:
Orders.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20 entries, 0 to 19
Data columns (total 5 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   OrderID    20 non-null     int64         
 1   UserID     20 non-null     int64         
 2   ProductID  20 non-null     int64         
 3   OrderDate  20 non-null     datetime64[ns]
 4   Amount     20 non-null     float64       
dtypes: datetime64[ns](1), float64(1), int64(3)
memory usage: 932.0 bytes


In [241]:
Orders.isnull().sum()

OrderID      0
UserID       0
ProductID    0
OrderDate    0
Amount       0
dtype: int64

In [242]:
Orders.duplicated().sum()

0

# Products

In [232]:
Products.head()

Unnamed: 0,ProductID,ProductName,Price
0,1,Product A,19.99
1,2,Product B,29.99
2,3,Product C,39.99
3,4,Product D,49.99
4,5,Product E,59.99


In [234]:
Products.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 3 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   ProductID    5 non-null      int64  
 1   ProductName  5 non-null      object 
 2   Price        5 non-null      float64
dtypes: float64(1), int64(1), object(1)
memory usage: 252.0+ bytes


In [243]:
Products.isnull().sum()

ProductID      0
ProductName    0
Price          0
dtype: int64

In [244]:
Products.duplicated().sum()

0

# Users

In [235]:
Users.head()

Unnamed: 0,UserID,SignUpDate,Location
0,1,14-09-2021 19:51,New York
1,2,18-03-2021 20:27,Los Angeles
2,3,16-04-2021 16:41,Chicago
3,4,17-04-2021 08:48,Houston
4,5,06-12-2021 03:28,Phoenix


In [236]:
Users.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   UserID      10 non-null     int64 
 1   SignUpDate  10 non-null     object
 2   Location    10 non-null     object
dtypes: int64(1), object(2)
memory usage: 372.0+ bytes


In [238]:
# change the datatype of Orders
Users['SignUpDate']=pd.to_datetime(Users['SignUpDate'])

In [239]:
Users.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   UserID      10 non-null     int64         
 1   SignUpDate  10 non-null     datetime64[ns]
 2   Location    10 non-null     object        
dtypes: datetime64[ns](1), int64(1), object(1)
memory usage: 372.0+ bytes


In [245]:
Users.isnull().sum()

UserID        0
SignUpDate    0
Location      0
dtype: int64

In [246]:
Users.duplicated().sum()

0

# Transform the Data- deal with missing and duplicate data

This dataset does't contain any null value or duplicate values, if there is any null values contains then we can go for Deletion or immutation method

In [247]:
Orders.drop_duplicates(keep='first').inplace=True

In [248]:
Orders.dropna(inplace=True)

In [249]:
Products.drop_duplicates(keep='first').inplace=True

In [250]:
Products.dropna(inplace=True)

In [251]:
Users.drop_duplicates(keep='first').inplace=True

In [252]:
Users.dropna(inplace=True)

# Create a Database

Go to PGAdmin(Postgre SQL) and create a Database Table

In [253]:
# Database Credentials
username = 'Enter your postgresql Username'
password = 'Enter your Password'
host = 'localhost'
port = 5432
db_name ='E-Commerce'

In [254]:
# Establish a connection
engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{db_name}')

try:
    # Load the data into PostgreSQL database (Table names - orders, products, users)
    Orders.to_sql('orders', engine, if_exists='replace', index=False)
    Products.to_sql('products', engine, if_exists='replace', index=False)
    Users.to_sql('users', engine, if_exists='replace', index=False)
    logging.info('Data pipeline execution completed successfully.')
except Exception as e:
    logging.error(f'Error in data pipeline: {str(e)}')
finally:
    # Close the connection
    engine.dispose()
