# **Saylani Mass Training Program**
### **Cloud Data Engineering Module by Qasim Hassan**

#### A basice Extract, Transform and Load (ETL) pipeline using web scrapping, pandas and sql

#### Import necessary Libraries

In [None]:
!pip install icecream

In [1]:
from io import StringIO #html ka data extract for string
import requests # kisi bhi web par request kaliya
from bs4 import BeautifulSoup # for scraping the web
import pandas as pd # transformation for currency
import sqlite3 # Data base hain python ki  basic hain
from datetime import datetime # for logs like date likhay gain
import logging
# from icecream import ic #printing may kam karta hain

#### Step 0: Maintaining a Log File
This step is done to record the logs while performing ETL and it is not neccessary in an ETL Pipeline

In [None]:
import os
from datetime import datetime

def log_progress(message):
    """This function logs the mentioned message of a given stage of the
    code execution to a log file. Function returns nothing.
    
    Also, it catches any errors during the logging process.
    """
    try:
        # Check if the 'logs' directory exists; if not, create it.
        if not os.path.exists('./logs'):
            os.makedirs('./logs')

        # Open the log file and write the log message.
        with open('./logs/code_log.txt', 'a') as f:
            f.write(f'{datetime.now()}: {message}\n')

    except Exception as e:
        # Handle any exceptions that occur during logging
        error_message = f"Logging error at {datetime.now()}: {str(e)}"
        try:
            # Attempt to log the error message into a separate error log file
            with open('./logs/error_log.txt', 'a') as error_f:
                error_f.write(f'{error_message}\n')
        except Exception as inner_e:
            # If error logging itself fails, print to the console as a last resort
            print(f"Failed to log error: {str(inner_e)}")


#### Step 1: Extract

In [None]:
# url = "https://en.wikipedia.org/wiki/List_of_largest_banks"
# table_attribs = 'By market capitalization'

# soup = BeautifulSoup(requests.get(url).text, 'html.parser') # html format 
# table = soup.find('span', string=table_attribs).find_next('table') # capital ka data lakar araha, table ki form may karrahi
# df = pd.read_html(StringIO(str(table)))[0] # string then save in df 
# pd is panda

In [None]:
def extract(url, table_attribs):
    """ 
    This function extracts information from the website and returns a DataFrame. 
    """
    try:
        # Request the webpage and parse it
        response = requests.get(url)
        response.raise_for_status()  # Check for HTTP errors
        soup = BeautifulSoup(response.text, 'html.parser')

        # Find the table and extract it into a DataFrame
        table = soup.find('span', string=table_attribs).find_next('table')
        df = pd.read_html(StringIO(str(table)))[0]

        log_progress('Data extraction complete.')

        return df

    except Exception as e:
        logging.error(f"Error: {e}")
        return None

#### Step 2: Transform

In [None]:
# csv_path = './input/exchange_rate.csv'
# exchange_rate = pd.read_csv(csv_path, index_col=0).to_dict()['Rate'] # read convert in dict and nikalo rate only

# df['MC_GBP_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['GBP'], 2)
# df['MC_EUR_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['EUR'], 2)
# df['MC_INR_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['INR'], 2)
# df['MC_PKR_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['PKR'], 2)

In [None]:
def transform(df, csv_path):
    """ 
    This function accesses the CSV file for exchange rate
    information, and adds three columns to the data frame, each
    containing the transformed version of Market Cap column to
    respective currencies.
    """
    try:
        # Read the exchange rate CSV file
        exchange_rate = pd.read_csv(csv_path, index_col=0).to_dict()['Rate']

    except FileNotFoundError:
        print(f"Error: The file at {csv_path} was not found.")
        return None  # or handle as appropriate
    except pd.errors.EmptyDataError:
        print(f"Error: The file at {csv_path} is empty.")
        return None
    except Exception as e:
        print(f"An unexpected error occurred while reading the file: {e}")
        return None

    try:
        # Perform transformation
        df['MC_GBP_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['GBP'], 2)
        df['MC_EUR_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['EUR'], 2)
        df['MC_INR_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['INR'], 2)
        df['MC_PKR_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['PKR'], 2)

    except KeyError as e:
        print(f"Error: Missing currency rate for {e}.")
        return None  # or handle as appropriate
    except Exception as e:
        print(f"An error occurred during transformation: {e}")
        return None

    print(df)

    log_progress('Data transformation complete. Initiating Loading process')

    return df


#### Step 3: Load

Loading data to a CSV

In [None]:

# output_path = "./output/Largest_banks_data.csv"
# df.to_csv(output_path)

In [None]:
def load_to_csv(df, output_path):
    """ This function saves the final data frame as a CSV file in
    the provided path. Function returns nothing."""
    
    try:
        df.to_csv(output_path)
        log_progress('Data saved to CSV file')
    except Exception as e:
        log_progress(f"Error occurred while saving data to CSV: {e}")


Loading data to SQL

In [None]:
def load_to_db(df, sql_connection, table_name):
    """ This function saves the final data frame to a database
    table with the provided name. Function returns nothing."""
    
    try:
        df.to_sql(table_name, sql_connection, if_exists='replace', index=False)
        log_progress('Data loaded to Database as a table, Executing queries')
    except Exception as e:
        log_progress(f"Error occurred while loading data to database: {e}")
        # Optionally, re-raise the error if you want it to propagate
        # raise e


def run_query(query_statement, sql_connection):
    """ This function runs the query on the database table and
    prints the output on the terminal. Function returns nothing. """
    
    try:
        cursor = sql_connection.cursor()
        cursor.execute(query_statement)
        result = cursor.fetchall()
        log_progress('Process Complete')
        return result
    except Exception as e:
        log_progress(f"Error occurred while executing query: {e}")
        # Optionally, re-raise the error if needed
        # raise e


### Executing Pipeline

In [None]:
if __name__ == '__main__':  # main code meray yaha par chalay ga
    url = 'https://web.archive.org/web/20230908091635/https://en.wikipedia.org/wiki/List_of_largest_banks'
    output_csv_path = './output/Largest_banks_data.csv'
    database_name = './output/Banks.db'
    table_name = 'Largest_banks'
    
    try:
        log_progress('Preliminaries complete. Initiating ETL process')

        # Extract data from the URL
        df = extract(url, 'By market capitalization')

        # Transform the data
        transform(df, './input/exchange_rate.csv')

        # Load data to CSV
        load_to_csv(df, output_csv_path)

        # Connect to SQLite database and load data
        with sqlite3.connect(database_name) as conn:
            load_to_db(df, conn, table_name)

            # Run queries and print the results
            print(run_query('SELECT * FROM Largest_banks', conn))
            print(run_query('SELECT AVG(MC_GBP_Billion) FROM Largest_banks', conn))
            print(run_query('SELECT "Bank name" FROM Largest_banks LIMIT 5', conn))

    except Exception as e:
        log_progress(f"Error occurred during the ETL pipeline execution: {e}")
        # Optionally, re-raise the exception if you want to propagate it further
        # raise e
