# Data Engineering Project - Developing an ETL Pipeline

In the following Data Engineering project, a complete ETL Pipeline has been developed by scraping data from Wikipedia's list of countries ranked by GDP, transforming the dataset, and loading it into a database.

There are three types of GDPs reported on the website:

1. International Monetary Fund (IMF): This is our desired GDP, if available.
2. World Bank (WB): We extract this GDP if IMF is not available.
3. United Nations (UN): We extract this GDP if IMF and WB are not available.

In [1]:
# importing the required libraries and packages

import pandas as pd
import numpy as np
import requests
from bs4 import BeautifulSoup
from datetime import datetime 
import sqlite3

# Extract

In [2]:
# fetching the contents of the wikipedia website

url='https://web.archive.org/web/20230902185326/https://en.wikipedia.org/wiki/List_of_countries_by_GDP_%28nominal%29'
data=requests.get(url).text

In [3]:
# Parsing the html

soup=BeautifulSoup(data, 'html.parser')

In [4]:
# creating a list of all tables of the website

tables=soup.find_all('table') #or: tables=soup.find_all('tbody')

In [5]:
# we inrend to scrape the third table

table=tables[2]

In [6]:
# creating a list of all table rows 

rows=table.find_all('tr')

In [7]:
# slicing the rows containing data

rows=rows[3:]

In [8]:
# storing table contents in a dictionary

dic={'Country':[], 'GDP':[]}

for row in rows:
    
    country=row.find_all('td')[0].text.strip()
    dic['Country'].append(country)
    
    GDP_IMF=row.find_all('td')[2].text
    GDP_WB=row.find_all('td')[3].text
    GDP_UN=row.find_all('td')[4].text
    
    # null values have been represented by em_dash (—) in the table
    # the Unicode '\u2014' represents the em dash character
    em_dash = '\u2014'    
    
    if GDP_IMF != em_dash:
        dic['GDP'].append(GDP_IMF)
    elif GDP_WB != em_dash:
        dic['GDP'].append(GDP_WB)
    else:
        dic['GDP'].append(GDP_UN)
        
dic

{'Country': ['United States',
  'China',
  'Japan',
  'Germany',
  'India',
  'United Kingdom',
  'France',
  'Italy',
  'Canada',
  'Brazil',
  'Russia',
  'South Korea',
  'Australia',
  'Mexico',
  'Spain',
  'Indonesia',
  'Netherlands',
  'Saudi Arabia',
  'Turkey',
  'Switzerland',
  'Taiwan',
  'Poland',
  'Argentina',
  'Belgium',
  'Sweden',
  'Ireland',
  'Thailand',
  'Norway',
  'Israel',
  'Singapore',
  'Austria',
  'Nigeria',
  'United Arab Emirates',
  'Vietnam',
  'Malaysia',
  'Philippines',
  'Bangladesh',
  'Denmark',
  'South Africa',
  'Hong Kong',
  'Egypt',
  'Pakistan',
  'Iran',
  'Chile',
  'Romania',
  'Colombia',
  'Czech Republic',
  'Finland',
  'Peru',
  'Iraq',
  'Portugal',
  'New Zealand',
  'Kazakhstan',
  'Greece',
  'Qatar',
  'Algeria',
  'Hungary',
  'Kuwait',
  'Ethiopia',
  'Ukraine',
  'Morocco',
  'Slovakia',
  'Ecuador',
  'Dominican Republic',
  'Puerto Rico',
  'Kenya',
  'Angola',
  'Cuba',
  'Oman',
  'Guatemala',
  'Bulgaria',
  'Venezu

In [9]:
# converting the dictionary into a dataframe

df=pd.DataFrame(dic)
df.head(5)

Unnamed: 0,Country,GDP
0,United States,26854599
1,China,19373586
2,Japan,4409738
3,Germany,4308854
4,India,3736882


In [10]:
# checking for null values

df.isnull().sum()

Country    0
GDP        0
dtype: int64

In [14]:
# functionizing extract step (turning the previous steps into a function)

def extract(url):
    ''' This function extracts the required
    information from the website and saves it to a dataframe. The
    function returns the dataframe for further processing. '''
    
    data=requests.get(url).text
    soup=BeautifulSoup(data, 'html.parser')
    tables=soup.find_all('table')
    table=tables[2]
    rows=table.find_all('tr')
    rows=rows[3:]
    
    dic={'Country':[], 'GDP':[]}

    for row in rows:
    
        country=row.find_all('td')[0].text.strip()
        dic['Country'].append(country)

        GDP_IMF=row.find_all('td')[2].text
        GDP_WB=row.find_all('td')[3].text
        GDP_UN=row.find_all('td')[4].text

        em_dash = '\u2014'    
        if GDP_IMF != em_dash:
            dic['GDP'].append(GDP_IMF)
        elif GDP_WB != em_dash:
            dic['GDP'].append(GDP_WB)
        else:
            dic['GDP'].append(GDP_UN)
    
    df=pd.DataFrame(dic)

    return df

In [21]:
# executing the extract function

df=extract(url)
df

Unnamed: 0,Country,GDP
0,United States,26854599
1,China,19373586
2,Japan,4409738
3,Germany,4308854
4,India,3736882
...,...,...
208,Anguilla,303
209,Kiribati,248
210,Nauru,151
211,Montserrat,72


# Transform

In [16]:
# removing ',' from the GDP column, then converting it to integer

df['GDP']=df['GDP'].str.replace(',' , '').astype(int)
df.head()

Unnamed: 0,Country,GDP
0,United States,26854599
1,China,19373586
2,Japan,4409738
3,Germany,4308854
4,India,3736882


In [17]:
# convrting GDP from million USD to billion USD

df['GDP']=round(df['GDP']/1000, 2)
df

Unnamed: 0,Country,GDP
0,United States,26854.60
1,China,19373.59
2,Japan,4409.74
3,Germany,4308.85
4,India,3736.88
...,...,...
208,Anguilla,0.30
209,Kiribati,0.25
210,Nauru,0.15
211,Montserrat,0.07


In [18]:
# renaming the GDP column

df=df.rename(columns={'GDP': 'GDP_USD_billion'})
df

Unnamed: 0,Country,GDP_USD_billion
0,United States,26854.60
1,China,19373.59
2,Japan,4409.74
3,Germany,4308.85
4,India,3736.88
...,...,...
208,Anguilla,0.30
209,Kiribati,0.25
210,Nauru,0.15
211,Montserrat,0.07


In [19]:
# functionizing transform step (turning the previous steps into a function)

def transform(df):
    ''' This function converts the GDP information from Currency
    format to integer value, transforms the information of GDP from
    USD (Millions) to USD (Billions) rounding to 2 decimal places, 
    and changes the column name.
    The function returns the transformed dataframe.'''
    
    df['GDP']=df['GDP'].str.replace(',' , '').astype(int)
    df['GDP']=round(df['GDP']/1000, 2)
    df=df.rename(columns={'GDP': 'GDP_USD_billion'})
    
    return df

In [22]:
# executing the transform function

df=transform(df)
df.head()

Unnamed: 0,Country,GDP_USD_billion
0,United States,26854.6
1,China,19373.59
2,Japan,4409.74
3,Germany,4308.85
4,India,3736.88


# Load

In [23]:
# functionizing load_to_csv

def load_to_csv(df, csv_path):
    ''' This function saves the final dataframe as a `CSV` file 
    in the provided path. Function returns nothing.'''

    df.to_csv(csv_path, index=False)

In [24]:
# saving the dataframe as a csv file

csv_path='data/IBM-Data-Engineering/Countries_by_GDP.csv'
load_to_csv(df, csv_path)

In [25]:
# functionizing load_to_json

def load_to_json(df, json_path):
    ''' This function saves the final dataframe as a `JSON` file 
    in the provided path. Function returns nothing.'''

    df.to_json(json_path, indent=4)

In [26]:
# saving the dataframe as a json file

json_path='data/IBM-Data-Engineering/Countries_by_GDP.json'
load_to_json(df, json_path)

In [27]:
# functionizing load to a database table

def load_to_db(df, sql_connection, table_name):
    ''' This function saves the final dataframe as a database table
    with the provided name. Function returns nothing.'''
    
    df.to_sql(table_name, sql_connection, if_exists = 'replace', index = False)

In [28]:
# storing the dataframe to a database table

db_name='World_Economies.db'
sql_connection = sqlite3.connect(db_name)
table_name='Countries_by_GDP'

load_to_db(df, sql_connection, table_name)

In [29]:
# functionizing running queries against the database table

def run_query(query_statement, sql_connection):
    ''' This function runs the stated query on the database table and
    prints the output on the terminal. Function returns nothing. '''
    
    print(query_statement)
    print()
    query_output = pd.read_sql(query_statement, sql_connection)
    print(query_output)

In [30]:
# running the first query against the database table

query_statement = f"SELECT * FROM {table_name} limit 5"
run_query(query_statement, sql_connection)

SELECT * FROM Countries_by_GDP limit 5

         Country  GDP_USD_billion
0  United States         26854.60
1          China         19373.59
2          Japan          4409.74
3        Germany          4308.85
4          India          3736.88


In [31]:
# running the second query against the database table

query_statement = f"SELECT * FROM {table_name} where GDP_USD_billion >= 100"
run_query(query_statement, sql_connection)

SELECT * FROM Countries_by_GDP where GDP_USD_billion >= 100

          Country  GDP_USD_billion
0   United States         26854.60
1           China         19373.59
2           Japan          4409.74
3         Germany          4308.85
4           India          3736.88
..            ...              ...
66         Angola           117.88
67           Cuba           107.35
68           Oman           104.90
69      Guatemala           102.31
70       Bulgaria           100.64

[71 rows x 2 columns]


# Execution and Logging the ETL Pipeline

In [33]:
# functionizing logging ETL steps into a text file

def log_progress(message):
    ''' This function logs the mentioned message at a given stage of the 
    code execution to a log file. Function returns nothing'''

    timestamp_format='%Y-%m-%d-%H:%M:%S'
    now=datetime.now()
    timestamp=now.strftime(timestamp_format)
    
    with open('data/IBM-Data-Engineering/etl_project_log.txt', 'a') as f:
        f.write(timestamp + ', ' + message + '\n')

In [34]:
# All ETL pipeline has been summarized in this cell, along with creating a log file

url = 'https://web.archive.org/web/20230902185326/https://en.wikipedia.org/wiki/List_of_countries_by_GDP_%28nominal%29'
db_name = 'World_Economies.db'
table_name = 'Countries_by_GDP'
csv_path = 'data/IBM-Data-Engineering/Countries_by_GDP.csv'
json_path='data/IBM-Data-Engineering/Countries_by_GDP.json'

log_progress('Preliminaries complete. Initiating ETL process')

df = extract(url)

log_progress('Data extraction complete. Initiating Transformation process')

df = transform(df)

log_progress('Data transformation complete. Initiating loading process')

load_to_csv(df, csv_path)

log_progress('Data saved to CSV file')

load_to_json(df, json_path)

log_progress('Data saved to JSON file')

sql_connection = sqlite3.connect(db_name)

log_progress('SQL Connection initiated.')

load_to_db(df, sql_connection, table_name)

log_progress('Data loaded to Database as table. Running the query')

query_statement = f"SELECT * FROM {table_name} limit 5"
run_query(query_statement, sql_connection)

log_progress('First query statement was executed.')

query_statement = f"SELECT * from {table_name} WHERE GDP_USD_billion >= 100"
run_query(query_statement, sql_connection)

log_progress('Second query statement was executed.')

log_progress('Process Complete.')

sql_connection.close()

log_progress('Server connection closed.')

SELECT * FROM Countries_by_GDP limit 5

         Country  GDP_USD_billion
0  United States         26854.60
1          China         19373.59
2          Japan          4409.74
3        Germany          4308.85
4          India          3736.88
SELECT * from Countries_by_GDP WHERE GDP_USD_billion >= 100

          Country  GDP_USD_billion
0   United States         26854.60
1           China         19373.59
2           Japan          4409.74
3         Germany          4308.85
4           India          3736.88
..            ...              ...
66         Angola           117.88
67           Cuba           107.35
68           Oman           104.90
69      Guatemala           102.31
70       Bulgaria           100.64

[71 rows x 2 columns]
