# **Saylani Mass Training Program**
### **Cloud Data Engineering  - Ayaan Merchant**

#### A basice Extract, Transform and Load (ETL) pipeline using web scrapping, pandas and sql

#### Import necessary Libraries

In [1]:
from io import StringIO  # Provides an in-memory file-like object for handling string data
import requests  # Used to make HTTP requests to fetch web pages or APIs
from bs4 import BeautifulSoup  # A library for parsing HTML and extracting data from it
import pandas as pd  # A powerful library for data manipulation and analysis
import sqlite3  # A library for SQLite database operations
from datetime import datetime  # Provides classes for manipulating dates and times

#### Step 0: Maintaining a Log File
This step is done to record the logs while performing ETL and it is not neccessary in an ETL Pipeline

In [2]:
def log_progress(message):
    """Log progress messages to a file."""
    
    # Define the directory where log files will be stored
    log_dir = 'logs'  # Folder named 'logs' will contain the log files
    
    # Create the directory if it doesn't already exist
    os.makedirs(log_dir, exist_ok=True)  # `exist_ok=True` prevents errors if the folder already exists
    
    # Define the path for the log file within the directory
    log_file_path = os.path.join(log_dir, 'code_log.txt')  # Log file named 'code_log.txt'
    
    # Open the log file in append mode to add new log messages
    with open(log_file_path, 'a') as f:
        # Write the current timestamp and the provided message to the log file
        f.write(f'{datetime.now()}: {message}\n')  # Each log entry includes a timestamp and the message


#### Step 1: Extract

In [3]:
def extract(url, table_attribs):
    """ 
    This function extracts the required information from a website and saves it to a DataFrame.
    The function returns the DataFrame for further processing.
    """
    
    # Fetch the HTML content of the webpage using the provided URL
    soup = BeautifulSoup(requests.get(url).text, 'html.parser')  # Parse the HTML content using BeautifulSoup
    
    # Locate the specific table in the HTML using the given table attributes
    table = soup.find('span', string=table_attribs).find_next('table')  
    # Find a <span> tag with the specified string (table_attribs) and find the next <table> tag
    
    # Convert the extracted HTML table into a pandas DataFrame
    df = pd.read_html(StringIO(str(table)))[0]  
    # `pd.read_html` reads HTML tables directly into a DataFrame. The `StringIO` wraps the table's HTML string.

    # Log progress to indicate successful data extraction and the start of transformation
    log_progress('Data extraction complete. Initiating Transformation process')  
    
    # Return the extracted DataFrame for further processing
    return df


#### Step 2: Transform

In [4]:
def transform(df, csv_path):
    """ 
    This function processes the input DataFrame to add transformed versions of the Market Cap column.
    It reads exchange rate information from a CSV file and creates three new columns with values
    in different currencies (GBP, EUR, INR).
    """

    # Load the exchange rate data from the CSV file and convert it to a dictionary
    # The 'Rate' column values become accessible as dictionary values
    exchange_rate = pd.read_csv(csv_path, index_col=0).to_dict()['Rate']  
    
    # Add a new column to the DataFrame for Market Cap in GBP, rounding to 2 decimal places
    df['MC_GBP_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['GBP'], 2)  
    
    # Add a new column to the DataFrame for Market Cap in EUR, rounding to 2 decimal places
    df['MC_EUR_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['EUR'], 2)  
    
    # Add a new column to the DataFrame for Market Cap in INR, rounding to 2 decimal places
    df['MC_INR_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['INR'], 2)  
    
    # Print the value of the 'MC_EUR_Billion' column for the 5th row to verify calculations
    print(df['MC_EUR_Billion'][4])  
    
    # Log progress to indicate successful data transformation and the start of the loading process
    log_progress('Data transformation complete. Initiating Loading process')  
    
    # Return the transformed DataFrame for further steps
    return df


#### Step 3: Load

Loading data to a CSV

In [5]:
def load_to_csv(df, output_path):
    """ 
    This function saves the final DataFrame as a CSV file at the specified path. 
    The function does not return anything.
    """

    # Save the DataFrame to a CSV file at the given output path
    df.to_csv(output_path)  
    
    # Log progress to indicate that the data has been successfully saved to a CSV file
    log_progress('Data saved to CSV file')  


Loading data to SQL

In [6]:
def load_to_db(df, sql_connection, table_name):
    """ 
    This function saves the final DataFrame to a database table with the provided name.
    The function does not return anything.
    """

    # Save the DataFrame to the database as a table with the specified name
    # If the table already exists, replace it
    df.to_sql(table_name, sql_connection, if_exists='replace', index=False)  
    
    # Log progress to indicate that the data has been loaded into the database
    log_progress('Data loaded to Database as a table, Executing queries')  


def run_query(query_statement, sql_connection):
    """ 
    This function runs a query on the database table and prints the output on the terminal.
    The function does not return anything.
    """

    # Create a cursor object to interact with the database
    cursor = sql_connection.cursor()  
    
    # Execute the provided SQL query
    cursor.execute(query_statement)  
    
    # Fetch all results from the executed query
    result = cursor.fetchall()  
    
    # Log progress to indicate the query execution is complete
    log_progress('Process Complete')  
    
    # Return the query results for further use or inspection
    return result  


### Executing Pipeline

In [10]:
import os

if __name__ == '__main__':
    # Start of the script, executes only when the script is run directly
    try:
        # File paths and constants
        url = 'https://web.archive.org/web/20230908091635/https://en.wikipedia.org/wiki/List_of_largest_banks'  
        # URL for the web page to scrape data from
        
        output_csv_path = './output/Largest_banks_data.csv'  
        # Path for saving the output CSV file
        
        database_name = './output/Banks.db'  
        # Name and location of the SQLite database file
        
        table_name = 'Largest_banks'  
        # Name of the database table to store the data

        # Ensure the output directory exists, creating it if necessary
        os.makedirs('./output', exist_ok=True)  

        # Log progress indicating that preliminary steps are complete
        log_progress('Preliminaries complete. Initiating ETL process')  

        # Step 1: Extract data
        df = extract(url, 'By market capitalization')  
        # Extract data from the specified URL and table identifier

        if df is None:
            # Check if the extract function returned a valid DataFrame
            raise ValueError("Extract function returned None")  # Raise an error if no data was extracted
        
        print("Extracted DataFrame:", df.head())  # Print the first few rows for debugging purposes

        # Step 2: Transform the data
        transform(df, './input/exchange_rate.csv')  
        # Apply transformations using exchange rates from the specified CSV file

        print("Transformed DataFrame:", df.head())  # Print the first few rows after transformation for debugging

        # Step 3: Load transformed data into a CSV file
        load_to_csv(df, output_csv_path)  
        # Save the transformed data to a CSV file at the specified path

        print(f"Data saved to CSV: {output_csv_path}")  # Confirm the CSV save operation

        # Step 4: Load transformed data into a database
        with sqlite3.connect(database_name) as conn:  
            # Establish a connection to the SQLite database (context manager ensures connection is closed)
            load_to_db(df, conn, table_name)  
            # Load the DataFrame into a database table

            print(f"Data loaded to database: {database_name}")  # Confirm the database load operation

            # Step 5: Run queries on the database
            print("Query Results:")
            print(run_query('SELECT * FROM Largest_banks', conn))  
            # Retrieve all rows from the database table
            
            print(run_query('SELECT AVG(MC_GBP_Billion) FROM Largest_banks', conn))  
            # Calculate and display the average market cap in GBP
            
            print(run_query('SELECT "Bank name" FROM Largest_banks LIMIT 5', conn))  
            # Retrieve the first 5 bank names

    except Exception as e:
        # Handle any errors that occur during the ETL process
        log_progress(f"Error occurred: {str(e)}")  
        # Log the error message

        print(f"Error occurred: {str(e)}")  # Print the error message to the console


Extracted DataFrame:    Rank                                Bank name  Market cap (US$ billion)
0     1                           JPMorgan Chase                    432.92
1     2                          Bank of America                    231.52
2     3  Industrial and Commercial Bank of China                    194.56
3     4               Agricultural Bank of China                    160.68
4     5                                HDFC Bank                    157.91
153.17
Transformed DataFrame:    Rank                                Bank name  Market cap (US$ billion)  \
0     1                           JPMorgan Chase                    432.92   
1     2                          Bank of America                    231.52   
2     3  Industrial and Commercial Bank of China                    194.56   
3     4               Agricultural Bank of China                    160.68   
4     5                                HDFC Bank                    157.91   

   MC_GBP_Billion  MC_EUR_Bill