## Building a simple ETL pipeline ( CSV into Postgre Database )

<!-- Step 1: Extract the data from a CSV file and load into a Pandas dataframe
Step 2: Transform the data ( Remove duplicates, missing data, run calculations etc)
Step 3: Create a database
Step 4: Load the transformed data into the database -->

In [4]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
pip install psycopg2

Collecting psycopg2
  Using cached psycopg2-2.9.10-cp310-cp310-win_amd64.whl (1.2 MB)
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.10
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
pip install sqlalchemy

Collecting sqlalchemy
  Downloading sqlalchemy-2.0.42-cp310-cp310-win_amd64.whl (2.1 MB)
     ---------------------------------------- 0.0/2.1 MB ? eta -:--:--
      --------------------------------------- 0.0/2.1 MB 1.4 MB/s eta 0:00:02
     - -------------------------------------- 0.1/2.1 MB 1.1 MB/s eta 0:00:02
     - -------------------------------------- 0.1/2.1 MB 787.7 kB/s eta 0:00:03
     - -------------------------------------- 0.1/2.1 MB 787.7 kB/s eta 0:00:03
     - -------------------------------------- 0.1/2.1 MB 787.7 kB/s eta 0:00:03
     - -------------------------------------- 0.1/2.1 MB 787.7 kB/s eta 0:00:03
     -- ------------------------------------- 0.1/2.1 MB 437.6 kB/s eta 0:00:05
     -- ------------------------------------- 0.1/2.1 MB 502.3 kB/s eta 0:00:04
     -- ------------------------------------- 0.2/2.1 MB 459.5 kB/s eta 0:00:05
     -- ------------------------------------- 0.2/2.1 MB 459.5 kB/s eta 0:00:05
     -- ------------------------------------


[notice] A new release of pip is available: 23.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [7]:
#import libraries
import pandas as pd
import psycopg2 #for connecting python to postgresql
from sqlalchemy import create_engine #to efficiently manage and reuse database connection

In [11]:
df=pd.read_csv(r"C:\Users\achar\Desktop\Utilities\DataEngineering\ETL_Project\First ETL Project\MOCK_DATA.csv")

In [12]:
df.head()

Unnamed: 0,id,first_name,last_name,email,gender,ip_address
0,1,Ede,Keneleyside,ekeneleyside0@ustream.tv,Female,160.207.123.12
1,2,Shelia,Swanson,sswanson1@engadget.com,Bigender,131.29.109.71
2,3,Porty,Jochanany,pjochanany2@shutterfly.com,Male,16.236.109.154
3,4,Ellis,Ovill,eovill3@shareasale.com,Male,199.86.130.56
4,5,Gerrie,Malecky,gmalecky4@dell.com,Female,82.184.43.12


In [13]:
df.tail()

Unnamed: 0,id,first_name,last_name,email,gender,ip_address
4995,4996,Kym,Antham,kanthamrn@unicef.org,Female,138.50.208.39
4996,4997,Mitch,Glasser,mglasserro@comsenz.com,Male,4.101.187.211
4997,4998,Niko,Fairrie,nfairrierp@java.com,Male,241.78.241.116
4998,4999,Ally,Moyle,amoylerq@4shared.com,Female,96.69.154.170
4999,5000,Bartholomeo,Letham,blethamrr@oaic.gov.au,Male,213.157.56.9


In [14]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   id          5000 non-null   int64 
 1   first_name  5000 non-null   object
 2   last_name   5000 non-null   object
 3   email       4423 non-null   object
 4   gender      5000 non-null   object
 5   ip_address  4670 non-null   object
dtypes: int64(1), object(5)
memory usage: 234.5+ KB


In [15]:
df.isna().sum()

id              0
first_name      0
last_name       0
email         577
gender          0
ip_address    330
dtype: int64

In [16]:
#missing data in email column

df[df['email'].isna()].head()

Unnamed: 0,id,first_name,last_name,email,gender,ip_address
1004,1005,Cooper,Winspire,,Male,133.5.122.84
1008,1009,Rodolfo,Baird,,Male,121.125.252.79
1010,1011,Caren,Joncic,,Female,37.166.247.72
1019,1020,Florenza,Lawton,,Female,83.21.54.135
1028,1029,Davina,Poluzzi,,Female,238.108.161.201


In [17]:
#fill in the missing data in email column

df['email'].fillna('Unknown',inplace=True)

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['email'].fillna('Unknown',inplace=True)


In [18]:
#fill in the missing data in ip_address column

df['ip_address'].fillna(0,inplace=True)

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['ip_address'].fillna(0,inplace=True)


In [19]:
df.isna().sum()

id            0
first_name    0
last_name     0
email         0
gender        0
ip_address    0
dtype: int64

In [20]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   id          5000 non-null   int64 
 1   first_name  5000 non-null   object
 2   last_name   5000 non-null   object
 3   email       5000 non-null   object
 4   gender      5000 non-null   object
 5   ip_address  5000 non-null   object
dtypes: int64(1), object(5)
memory usage: 234.5+ KB


In [21]:
pip install dotenv

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [23]:
from dotenv import load_dotenv
import os

load_dotenv()

db_username = os.getenv("DB_USERNAME")
db_password = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_name = os.getenv("DB_NAME")
csv_path = os.getenv("CSV_PATH")


In [24]:
#Establish a connection using SQLAlchemy engine

connection = create_engine(f'postgresql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}')

In [25]:
# Load the dataset into the postgre database

df.to_sql('emp_table',connection, if_exists='replace', index=False)

connection.dispose()