# Data Engineering Project - Developing an ETL Pipeline

In the following Data Engineering project, a complete ETL Pipeline has been developed by scraping data from Wikipedia's list of the world’s largest banks, transforming the dataset, and loading it into a database.

In [1]:
# importing the required libraries and packages

import pandas as pd
import numpy as np
import requests
from bs4 import BeautifulSoup
from datetime import datetime 
import sqlite3

# Extract

In [2]:
# fetching the contents of the wikipedia website

url='https://web.archive.org/web/20230908091635 /https://en.wikipedia.org/wiki/List_of_largest_banks'
data=requests.get(url).text

In [3]:
# Parsing the html

soup=BeautifulSoup(data, 'html.parser')

In [4]:
# creating a list of all tables of the website

tables=soup.find_all('tbody') #or: tables=soup.find_all('tables')

In [5]:
# we inrend to scrape the first table

table=tables[0]

In [6]:
# creating a list of all table rows 

rows=table.find_all('tr')

In [7]:
# slicing the rows containing data

rows=rows[1:]

In [22]:
# storing table contents in a dictionary

key1='Rank'
key2='Bank_Name'
key3='MC_USD_Billion'

dic={key1:[], key2:[], key3:[]}

for row in rows:
    
    rank=row.find_all('td')[0].text.strip()
    dic[key1].append(rank)
    
    name=row.find_all('td')[1].text.strip()
    dic[key2].append(name)
    
    mc_usd=row.find_all('td')[2].text.strip()
    dic[key3].append(mc_usd)
        
dic

{'Rank': ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'],
 'Bank_Name': ['JPMorgan Chase',
  'Bank of America',
  'Industrial and Commercial Bank of China',
  'Agricultural Bank of China',
  'HDFC Bank',
  'Wells Fargo',
  'HSBC Holdings PLC',
  'Morgan Stanley',
  'China Construction Bank',
  'Bank of China'],
 'MC_USD_Billion': ['432.92',
  '231.52',
  '194.56',
  '160.68',
  '157.91',
  '155.87',
  '148.90',
  '140.83',
  '139.82',
  '136.81']}

In [23]:
# converting the dictionary into a dataframe

df=pd.DataFrame(dic)
df

Unnamed: 0,Rank,Bank_Name,MC_USD_Billion
0,1,JPMorgan Chase,432.92
1,2,Bank of America,231.52
2,3,Industrial and Commercial Bank of China,194.56
3,4,Agricultural Bank of China,160.68
4,5,HDFC Bank,157.91
5,6,Wells Fargo,155.87
6,7,HSBC Holdings PLC,148.9
7,8,Morgan Stanley,140.83
8,9,China Construction Bank,139.82
9,10,Bank of China,136.81


In [32]:
# functionizing extract step (turning the previous steps into a function)

def extract(url, table_attribs):
    ''' This function extracts the required
    information from the website and saves it to a dataframe. The
    function returns the dataframe for further processing. '''
    
    key1=table_attribs[0]
    key2=table_attribs[1]
    key3=table_attribs[2]
    
    data=requests.get(url).text
    soup=BeautifulSoup(data, 'html.parser')
    tables=soup.find_all('tbody')
    table=tables[0]
    rows=table.find_all('tr')
    rows=rows[1:]
    
    dic={key1:[], key2:[], key3:[]}

    for row in rows:

        rank=row.find_all('td')[0].text.strip()
        dic[key1].append(rank)

        name=row.find_all('td')[1].text.strip()
        dic[key2].append(name)

        mc_usd=row.find_all('td')[2].text.strip()
        dic[key3].append(mc_usd)
        
    df=pd.DataFrame(dic)

    return df

In [34]:
# executing the extract function

table_attribs = ["Rank", "Bank_Name", "MC_USD_Billion"]
df=extract(url, table_attribs)
df

Unnamed: 0,Rank,Bank_Name,MC_USD_Billion
0,1,JPMorgan Chase,432.92
1,2,Bank of America,231.52
2,3,Industrial and Commercial Bank of China,194.56
3,4,Agricultural Bank of China,160.68
4,5,HDFC Bank,157.91
5,6,Wells Fargo,155.87
6,7,HSBC Holdings PLC,148.9
7,8,Morgan Stanley,140.83
8,9,China Construction Bank,139.82
9,10,Bank of China,136.81


# Transform

In [35]:
# investigating data types

df.dtypes

Rank              object
Bank_Name         object
MC_USD_Billion    object
dtype: object

In [36]:
# casting MC_USD_Billion column to float

df['MC_USD_Billion']=df['MC_USD_Billion'].astype(float)

In [38]:
# reading in the exchange_rate csv file

exchange_rate=pd.read_csv('data/IBM-Data-Engineering/exchange_rate.csv')
exchange_rate

Unnamed: 0,Currency,Rate
0,EUR,0.93
1,GBP,0.8
2,INR,82.95


In [40]:
# adding different currencies to the dataframe

df['MC_EUR_Billion']=(df['MC_USD_Billion']*exchange_rate.iloc[0,1]).round(2)
df['MC_GBP_Billion']=(df['MC_USD_Billion']*exchange_rate.iloc[1,1]).round(2)
df['MC_INR_Billion']=(df['MC_USD_Billion']*exchange_rate.iloc[2,1]).round(2)
df

Unnamed: 0,Rank,Bank_Name,MC_USD_Billion,MC_EUR_Billion,MC_GBP_Billion,MC_INR_Billion
0,1,JPMorgan Chase,432.92,402.62,346.34,35910.71
1,2,Bank of America,231.52,215.31,185.22,19204.58
2,3,Industrial and Commercial Bank of China,194.56,180.94,155.65,16138.75
3,4,Agricultural Bank of China,160.68,149.43,128.54,13328.41
4,5,HDFC Bank,157.91,146.86,126.33,13098.63
5,6,Wells Fargo,155.87,144.96,124.7,12929.42
6,7,HSBC Holdings PLC,148.9,138.48,119.12,12351.26
7,8,Morgan Stanley,140.83,130.97,112.66,11681.85
8,9,China Construction Bank,139.82,130.03,111.86,11598.07
9,10,Bank of China,136.81,127.23,109.45,11348.39


In [41]:
# functionizing transform step (turning the previous steps into a function)

def transform(df, csv_path):
    
    ''' This function accesses the CSV file for exchange rate
    information, and adds three columns to the data frame, each
    containing the transformed version of Market Cap column to
    respective currencies'''
    
    df['MC_USD_Billion']=df['MC_USD_Billion'].astype(float)
    
    exchange_rate=pd.read_csv(csv_path)
    
    df['MC_EUR_Billion']=(df['MC_USD_Billion']*exchange_rate.iloc[0,1]).round(2)
    df['MC_GBP_Billion']=(df['MC_USD_Billion']*exchange_rate.iloc[1,1]).round(2)
    df['MC_INR_Billion']=(df['MC_USD_Billion']*exchange_rate.iloc[2,1]).round(2)
    
    return df

In [42]:
# executing the transform function

csv_path='data/IBM-Data-Engineering/exchange_rate.csv'
df=transform(df, csv_path)
df

Unnamed: 0,Rank,Bank_Name,MC_USD_Billion,MC_EUR_Billion,MC_GBP_Billion,MC_INR_Billion
0,1,JPMorgan Chase,432.92,402.62,346.34,35910.71
1,2,Bank of America,231.52,215.31,185.22,19204.58
2,3,Industrial and Commercial Bank of China,194.56,180.94,155.65,16138.75
3,4,Agricultural Bank of China,160.68,149.43,128.54,13328.41
4,5,HDFC Bank,157.91,146.86,126.33,13098.63
5,6,Wells Fargo,155.87,144.96,124.7,12929.42
6,7,HSBC Holdings PLC,148.9,138.48,119.12,12351.26
7,8,Morgan Stanley,140.83,130.97,112.66,11681.85
8,9,China Construction Bank,139.82,130.03,111.86,11598.07
9,10,Bank of China,136.81,127.23,109.45,11348.39


# Load

In [43]:
# functionizing load_to_csv

def load_to_csv(df, output_path):
    ''' This function saves the final dataframe as a `CSV` file 
    in the provided path. Function returns nothing.'''

    df.to_csv(output_path, index=False)

In [44]:
# saving the dataframe as a csv file

output_path='data/IBM-Data-Engineering/Largest_banks_data.csv'
load_to_csv(df, output_path)

In [45]:
# functionizing load_to_json

def load_to_json(df, json_path):
    ''' This function saves the final dataframe as a `JSON` file 
    in the provided path. Function returns nothing.'''

    df.to_json(json_path, indent=4)

In [46]:
# saving the dataframe as a json file

json_path='data/IBM-Data-Engineering/Largest_banks_data.json'
load_to_json(df, json_path)

In [47]:
# functionizing load to a database table

def load_to_db(df, sql_connection, table_name):
    ''' This function saves the final dataframe as a database table
    with the provided name. Function returns nothing.'''
    
    df.to_sql(table_name, sql_connection, if_exists = 'replace', index = False)

In [48]:
# storing the dataframe to a database table

db_name='Banks.db'
sql_connection = sqlite3.connect(db_name)
table_name='Largest_banks'

load_to_db(df, sql_connection, table_name)

In [49]:
# functionizing running queries against the database table

def run_query(query_statement, sql_connection):
    ''' This function runs the stated query on the database table and
    prints the output on the terminal. Function returns nothing. '''
    
    print(query_statement)
    print()
    query_output = pd.read_sql(query_statement, sql_connection)
    print(query_output)

In [50]:
# running the first query against the database table

query_statement = f"SELECT * FROM Largest_banks"
run_query(query_statement, sql_connection)

SELECT * FROM Largest_banks

  Rank                                Bank_Name  MC_USD_Billion  \
0    1                           JPMorgan Chase          432.92   
1    2                          Bank of America          231.52   
2    3  Industrial and Commercial Bank of China          194.56   
3    4               Agricultural Bank of China          160.68   
4    5                                HDFC Bank          157.91   
5    6                              Wells Fargo          155.87   
6    7                        HSBC Holdings PLC          148.90   
7    8                           Morgan Stanley          140.83   
8    9                  China Construction Bank          139.82   
9   10                            Bank of China          136.81   

   MC_EUR_Billion  MC_GBP_Billion  MC_INR_Billion  
0          402.62          346.34        35910.71  
1          215.31          185.22        19204.58  
2          180.94          155.65        16138.75  
3          149.43        

In [51]:
# running the second query against the database table

query_statement = f"SELECT AVG(MC_GBP_Billion) FROM Largest_banks"
run_query(query_statement, sql_connection)

SELECT AVG(MC_GBP_Billion) FROM Largest_banks

   AVG(MC_GBP_Billion)
0              151.987


In [53]:
# running the third query against the database table

query_statement = f"SELECT Bank_Name from Largest_banks LIMIT 5"
run_query(query_statement, sql_connection)

SELECT Bank_Name from Largest_banks LIMIT 5

                                 Bank_Name
0                           JPMorgan Chase
1                          Bank of America
2  Industrial and Commercial Bank of China
3               Agricultural Bank of China
4                                HDFC Bank


# Execution and Logging the ETL Pipeline

In [54]:
# functionizing logging ETL steps into a text file

def log_progress(message):
    ''' This function logs the mentioned message at a given stage of the 
    code execution to a log file. Function returns nothing'''

    timestamp_format='%Y-%m-%d-%H:%M:%S'
    now=datetime.now()
    timestamp=now.strftime(timestamp_format)
    
    with open('data/IBM-Data-Engineering/code_log.txt', 'a') as f:
        f.write(timestamp + ', ' + message + '\n')

In [55]:
# All ETL pipeline has been summarized in this cell, along with creating a log file

url = 'https://web.archive.org/web/20230908091635 /https://en.wikipedia.org/wiki/List_of_largest_banks'
db_name='Banks.db'
table_name='Largest_banks'
csv_path = 'data/IBM-Data-Engineering/exchange_rate.csv'
output_path = 'data/IBM-Data-Engineering/Largest_banks_data.csv'
json_path='data/IBM-Data-Engineering/Largest_banks_data.json'

log_progress('Preliminaries complete. Initiating ETL process')

df = extract(url, table_attribs)

log_progress('Data extraction complete. Initiating Transformation process')

df = transform(df, csv_path)

log_progress('Data transformation complete. Initiating loading process')

load_to_csv(df, output_path)

log_progress('Data saved to CSV file')

load_to_json(df, json_path)

log_progress('Data saved to JSON file')

sql_connection = sqlite3.connect(db_name)

log_progress('SQL Connection initiated.')

load_to_db(df, sql_connection, table_name)

log_progress('Data loaded to Database as table. Running the query')

query_statement = f"SELECT * FROM Largest_banks"
run_query(query_statement, sql_connection)

log_progress('First query statement was executed.')

query_statement = f"SELECT AVG(MC_GBP_Billion) FROM Largest_banks"
run_query(query_statement, sql_connection)

log_progress('Second query statement was executed.')

query_statement = f"SELECT Bank_Name from Largest_banks LIMIT 5"
run_query(query_statement, sql_connection)

log_progress('Third query statement was executed.')

log_progress('Process complete.')

sql_connection.close()

log_progress('Server connection closed.')

SELECT * FROM Largest_banks

  Rank                                Bank_Name  MC_USD_Billion  \
0    1                           JPMorgan Chase          432.92   
1    2                          Bank of America          231.52   
2    3  Industrial and Commercial Bank of China          194.56   
3    4               Agricultural Bank of China          160.68   
4    5                                HDFC Bank          157.91   
5    6                              Wells Fargo          155.87   
6    7                        HSBC Holdings PLC          148.90   
7    8                           Morgan Stanley          140.83   
8    9                  China Construction Bank          139.82   
9   10                            Bank of China          136.81   

   MC_EUR_Billion  MC_GBP_Billion  MC_INR_Billion  
0          402.62          346.34        35910.71  
1          215.31          185.22        19204.58  
2          180.94          155.65        16138.75  
3          149.43        