# **Saylani Mass Training Program**
### **Cloud Data Engineering Module by Qasim Hassan**

#### A basice Extract, Transform and Load (ETL) pipeline using web scrapping, pandas and sql

#### Import necessary Libraries

In [1]:
#!pip install icecream

In [1]:
from io import StringIO
import requests
from bs4 import BeautifulSoup
import pandas as pd
import sqlite3
from datetime import datetime
#from icecream import ic

#### Step 0: Maintaining a Log File
This step is done to record the logs while performing ETL and it is not neccessary in an ETL Pipeline

In [2]:
def log_progress(message):
    """This function logs the mentioned message of a given stage of the
    code execution to a log file. Function returns nothing"""

    try:
        with open('./logs/code_log.txt', 'a') as f:
            f.write(f'{datetime.now()}: {message}\n')
    except Exception as e:
        print(f"Logging failed: {e}")

In [4]:
# url = 'https://web.archive.org/web/20230908091635/https://en.wikipedia.org/wiki/List_of_largest_banks'
# table_attribs = 'By market capitalization'


#### Step 1: Extract

In [3]:
def extract(url, table_attribs):
    """ This function aims to extract the required
    information from the website and save it to a data frame. The
    function returns the data frame for further processing. """
    try:
        response = requests.get(url)
        response.raise_for_status()  # Raises HTTPError for bad responses

        soup = BeautifulSoup(response.text, 'html.parser')
        span = soup.find('span', string=table_attribs)

        if span is None:
            raise ValueError(f"Span with text '{table_attribs}' not found")

        table = span.find_next('table')
        if table is None:
            raise ValueError("Table following the span not found")

        df = pd.read_html(StringIO(str(table)))[0]

        log_progress('Data extraction complete. Initiating Transformation process')

        return df

    except Exception as e:
        log_progress(f"Error during data extraction: {e}")
        print(f"Extraction failed: {e}")
        return pd.DataFrame()  # Return an empty DataFrame on failure

In [4]:
# df=extract(url, table_attribs)
# df

In [5]:
# csv_path = './input/exchange_rate.csv'

#### Step 2: Transform

In [4]:
def transform(df, csv_path):
    """ This function accesses the CSV file for exchange rate
    information, and adds three columns to the data frame, each
    containing the transformed version of Market Cap column to
    respective currencies"""

    try:
        exchange_rate = pd.read_csv(csv_path, index_col=0).to_dict()['Rate']
        print(exchange_rate)

        required_col = 'Market cap(US$ billion)'
        if required_col not in df.columns:
            raise KeyError(f"Missing required column: '{required_col}'")

        df['MC_GBP_Billion'] = round(df[required_col] * exchange_rate['GBP'], 2)
        df['MC_EUR_Billion'] = round(df[required_col] * exchange_rate['EUR'], 2)
        df['MC_INR_Billion'] = round(df[required_col] * exchange_rate['INR'], 2)

        print(df)

        log_progress('Data transformation complete. Initiating Loading process')

        return df

    except Exception as e:
        log_progress(f"Error during data transformation: {e}")
        print(f"Transformation failed: {e}")
        return df  # Return the original df, even if not fully transformed

In [7]:
# transform(df, csv_path)

#### Step 3: Load

Loading data to a CSV

In [5]:
def load_to_csv(df, output_path):
    """ This function saves the final data frame as a CSV file in
    the provided path. Function returns nothing."""

    try:
        df.to_csv(output_path)
        log_progress('Data saved to CSV file')
    except Exception as e:
        log_progress(f"Error saving data to CSV: {e}")
        print(f"CSV export failed: {e}")

Loading data to SQL

In [6]:
def load_to_db(df, sql_connection, table_name):
    """ This function saves the final data frame to a database
    table with the provided name. Function returns nothing."""

    try:
        df.to_sql(table_name, sql_connection, if_exists='replace', index=False)
        log_progress('Data loaded to Database as a table, Executing queries')
    except Exception as e:
        log_progress(f"Error loading data to database: {e}")
        print(f"Database load failed: {e}")


def run_query(query_statement, sql_connection):
    """ This function runs the query on the database table and
    prints the output on the terminal. Function returns nothing. """

    try:
        cursor = sql_connection.cursor()
        cursor.execute(query_statement)
        result = cursor.fetchall()
        log_progress('Process Complete')
        return result
    except Exception as e:
        log_progress(f"Error executing query: {e}")
        print(f"Query execution failed: {e}")
        return None

### Executing Pipeline

In [11]:
if __name__ == '__main__':
    url = 'https://web.archive.org/web/20230908091635/https://en.wikipedia.org/wiki/List_of_largest_banks'
    output_csv_path = './output/Largest_banks_data.csv'
    database_name = './output/Banks.db'
    table_name = 'Largest_banks'

    log_progress('Preliminaries complete. Initiating ETL process')

    try:
        df = extract(url, 'By market capitalization')
    except Exception as e:
        log_progress(f"ETL aborted during extraction: {e}")
        exit(1)

    try:
        df = transform(df, './input/exchange_rate.csv')
    except Exception as e:
        log_progress(f"ETL aborted during transformation: {e}")
        exit(1)

    try:
        load_to_csv(df, output_csv_path)
    except Exception as e:
        log_progress(f"ETL aborted during CSV export: {e}")
        exit(1)

    try:
        with sqlite3.connect(database_name) as conn:
            load_to_db(df, conn, table_name)

            result1 = run_query('SELECT * FROM Largest_banks', conn)
            print(result1)

            result2 = run_query('SELECT AVG(MC_GBP_Billion) FROM Largest_banks', conn)
            print(result2)

            result3 = run_query('SELECT "Bank name" FROM Largest_banks LIMIT 5', conn)
            print(result3)

    except Exception as e:
        log_progress(f"ETL aborted during database operations: {e}")
        print(f"Database error: {e}")
        exit(1)

{'EUR': 0.93, 'GBP': 0.8, 'INR': 82.95}
Transformation failed: "Missing required column: 'Market cap(US$ billion)'"
[(1, 'JPMorgan Chase', 432.92), (2, 'Bank of America', 231.52), (3, 'Industrial and Commercial Bank of China', 194.56), (4, 'Agricultural Bank of China', 160.68), (5, 'HDFC Bank', 157.91), (6, 'Wells Fargo', 155.87), (7, 'HSBC Holdings PLC', 148.9), (8, 'Morgan Stanley', 140.83), (9, 'China Construction Bank', 139.82), (10, 'Bank of China', 136.81)]
Query execution failed: no such column: MC_GBP_Billion
None
[('JPMorgan Chase',), ('Bank of America',), ('Industrial and Commercial Bank of China',), ('Agricultural Bank of China',), ('HDFC Bank',)]
