In [9]:
import requests
from bs4 import BeautifulSoup as bs
import pandas as pd
from datetime import datetime as dt
import logging
import json
import psycopg2
from psycopg2 import sql
import uuid

# Generate unique BATCH_ID for this ETL run
batch_id = str(uuid.uuid4())

world_bank_url = 'https://en.wikipedia.org/wiki/List_of_largest_banks'
exchange_rates_url = 'https://www.irs.gov/individuals/international-taxpayers/yearly-average-currency-exchange-rates'

headers_logs_df = ["Log Phase", "Message", "Datetime", "Batch ID"]
headers_world_bank = ["Bank name", "Market cap (US$ billion)", "Last Modified Date"]
headers_exchange_rates = ["Country", "Currency", "Exchange Rate", "Year"]

world_bank = "World Bank Data"
exchange_rates = "Exchange Rates Data"


In [11]:
logs_df = pd.DataFrame(columns=headers_logs_df)

logging.basicConfig(level=logging.INFO, format='%(message)s')

def log_progress(phase, message):
    global logs_df
    current_time = dt.now().strftime('%Y-%m-%d %H:%M:%S')
    new_log = pd.DataFrame([[phase, message, current_time, batch_id]], columns=headers_logs_df)
    logs_df = pd.concat([logs_df, new_log], ignore_index=True)
    logging.info(f"{current_time} - {batch_id} - {phase} - {message}")

log_progress("Initialization", "Logging Initialized")

2024-08-06 07:20:33 - a466f0ad-18b0-4d74-9727-6aafaf3616e8 - Initialization - Logging Initialized


In [12]:
def extract_world_bank_data(url):
    try:
        log_progress(f"Extraction: {world_bank}", "Starting extraction of world bank data")
        log_progress(f"Extraction: {world_bank}", f"Connecting to URL: {url}")
        response = requests.get(url)
        if response.status_code != 200:
            log_progress(f"Extraction: {world_bank}", f"Failed to connect to {url} status code: {response.status_code}")
            raise Exception(f"Failed to retrieve data from {url}")
        log_progress(f"Extraction: {world_bank}", "Successfully connected to URL")
        
        soup = bs(response.content, 'html.parser')
        
        log_progress(f"Extraction: {world_bank}", "Extracting table content")        
        world_bank_data = soup.find_all('tbody')
        if len(world_bank_data) <= 2:
            raise Exception("Unexpected table structure")
        world_bank_data = world_bank_data[2]
        rows = world_bank_data.find_all('tr')
        
        log_progress(f"Extraction: {world_bank}", "Extracting last modified date")        
        exchange_rates_last_modif = soup.find(id = "footer-info-lastmod")
        lastmod_text = exchange_rates_last_modif.get_text()
        date_string = lastmod_text.split("on")[1].split(",")[0].strip()
        formatted_last_modified_date = dt.strptime(date_string, '%d %B %Y').strftime('%Y-%m-%d')
        
        data = [
            [
                row.find_all('td')[1].get_text().strip(),
                row.find_all('td')[2].get_text().strip(),
                formatted_last_modified_date
            ]
            for row in rows if len(row.find_all('td')) > 0
        ]
        
        df = pd.DataFrame(data, columns = headers_world_bank)
        df["Batch ID"] = batch_id
        log_progress(f"Extraction: {world_bank}", "Completed extraction of world bank data")
        
        pd.set_option('display.max_columns', None)
        pd.set_option('display.width', 1000)
        pd.set_option('display.colheader_justify', 'center')

        return df
    except Exception as e:
        log_progress(f"Extraction: {world_bank}", f"Failed during extraction of world bank data: {e}")
        raise

def extract_exchange_rates_data(url):
    try:
        log_progress(f"Extraction: {exchange_rates}", "Starting extraction of exchange rates data")
        log_progress(f"Extraction: {exchange_rates}", f"Connecting to URL: {url}")
        response = requests.get(url)
        if response.status_code != 200:
            log_progress(f"Extraction: {exchange_rates}", f"Failed to connect to {url} status code: {response.status_code}")
            raise Exception(f"Failed to retrieve data from {url}")
        log_progress(f"Extraction: {exchange_rates}", "Successfully connected to URL")
        
        soup = bs(response.content, 'html.parser')
        
        log_progress(f"Extraction: {exchange_rates}", "Extracting year from header")
        exchange_rates_header = soup.find('thead')
        exchange_rates_header_data = exchange_rates_header.find('tr')
        headers = [exchange_rates_header.get_text().strip() for exchange_rates_header in exchange_rates_header_data.find_all('th')]
        year_column = headers[2]
        
        year_column = None
        for exchange_rates_header in headers:
            if exchange_rates_header.isdigit() and len(exchange_rates_header) == 4:
                year_column = exchange_rates_header
                break
        
        if year_column is None:
            log_progress(f"Extraction: {exchange_rates}", f"Failed trying to extract year column from the header column")
            raise Exception("No year column found in the table headers")
        
        log_progress(f"Extraction: {exchange_rates}", "Creating date format from the extracted year")        
        date_str = f"31-12-{year_column}"
        
        log_progress(f"Extraction: {exchange_rates}", "Extracting content from the table")
        exchange_rates_data = soup.find('tbody')
        if not exchange_rates_data:
            log_progress(f"Extraction: {exchange_rates}", "No table found on the page")
            raise Exception("No table found on the page")
        
        rows = exchange_rates_data.find_all('tr')
        if not rows:
            log_progress(f"Extraction: {exchange_rates}", "No rows found in the table")
        
        data = [
            [
                row.find_all('td')[0].get_text().strip(),
                row.find_all('td')[1].get_text().strip(),
                row.find_all('td')[2].get_text().strip(),
                date_str
            ]
            for row in rows if len(row.find_all('td')) > 0
        ]
        
        df = pd.DataFrame(data, columns = headers_exchange_rates)
        df["Batch ID"] = batch_id
        log_progress(f"Extraction: {exchange_rates}", "Completed extraction of exchange rates data")
        return df
    
    except Exception as e:
        log_progress(f"Extraction: {exchange_rates}", f"Failed during extraction of {e}")

In [13]:
def load_db_config(config_path = 'Config DB/config.json'):
    with open(config_path, 'r') as config_file:
        return json.load(config_file)

In [14]:
def insert_log(cur, log_phase, message, log_datetime, batch_id):
    try:
        cur.execute("""
            CALL etl.insert_log(%s, %s, %s, %s)
        """, ([log_phase, message, log_datetime, batch_id]))
    except Exception as e:
        log_progress("Logging", f"Failed to insert log: {e}")
        raise

def execute_with_logging(cursor, sql, params):
    cursor.execute(sql, params)
    for notice in cursor.connection.notices:
        log_progress("Database", notice.strip())
    cursor.connection.notices.clear()

def load_to_db(df, db_connection_info, table_name, batch_id):
    
    try:
        log_progress(f"Loading: {table_name}", f"Starting loading data to the table in database")
        conn = psycopg2.connect(**db_connection_info)
        cur = conn.cursor()
        
        rows_affected = 0
        
        if table_name == 'world_bank_data':
            for _, row in df.iterrows():
                try:
                    execute_with_logging(cur, """
                                 CALL etl.insert_or_update_world_bank_data(%s, %s, %s, %s)
                                 """,  
                                 (row['Bank name'], row['Market cap (US$ billion)'], row['Last Modified Date'], batch_id))
                    
                    # Log the action only if rows were affected
                    rows_affected += 1
                except Exception as e:
                    log_progress(f"Loading: {table_name}", f"Failed to insert/update to world bank data: {e}")
                    raise
            if rows_affected > 0:
                try:
                    # Deactivate old records that are not in the current batch
                    execute_with_logging(cur, "CALL etl.deactivate_bank_records(%s)", (batch_id,))
                    # Log the deactivation
                    insert_log(cur, 'Deactivate', 'Deactivated old world bank records', dt.now().strftime('%Y-%m-%d %H:%M:%S'), batch_id)
                except Exception as e:
                    log_progress(f"Loading: {table_name}", f"Failed to insert/update to world bank data: {e}")
                    raise
            
            # Collect counts from the database notices
            cur.execute("CALL etl.insert_or_update_world_bank_data_summary()")
            for notice in cur.connection.notices:
                log_progress("Database", notice.strip())
        
        elif table_name == 'exchanges_rates':
            for _, row in df.iterrows():
                try:
                    execute_with_logging(cur, """
                    CALL etl.insert_or_update_exchange_rates(%s, %s, %s, %s, %s)
                    """, (row['Country'], row['Currency'], row['Exchange Rate'], row['Year'], batch_id))
                
                    # Log the action only if rows were affected
                    rows_affected += 1
                except Exception as e:
                    log_progress(f"Loading: {table_name}", f"Failed to insert/update to exchange rates data: {e}")
                    raise
            # Collect counts from the database notices
            cur.execute("CALL etl.insert_or_update_exchange_rate_data_summary()")
            for notice in cur.connection.notices:
                log_progress("Database", notice.strip())
        
        conn.commit()
        
        if rows_affected > 0:
            log_progress(f"Loading to Database", f"Completed loading data to {table_name} table in database")
        else:
            log_progress(f"Loading to Database", f"No changes made to {table_name} table in database")
            
    
    except Exception as e:
        log_progress(f"Loading: {table_name}", f"Failed during loading data to the table in database {e}")
        raise
    
    finally:
        cur.close()
        conn.close()
    
    return rows_affected > 0

In [15]:
def main():
    # Load database configuration
    db_connection_info = load_db_config()
    
    # Extract and Transform Data
    world_bank_df = extract_world_bank_data(world_bank_url)
    exchange_rates_df = extract_exchange_rates_data(exchange_rates_url)
    
    # Load data into the database
    load_to_db(world_bank_df, db_connection_info, 'world_bank_data', batch_id)
    load_to_db(exchange_rates_df, db_connection_info, 'exchanges_rates', batch_id)
    
    # Save Logs
    conn = psycopg2.connect(**db_connection_info)
    cur = conn.cursor()
    for _, row in logs_df.iterrows():
        try:
            insert_log(cur, row['Log Phase'], row['Message'], row['Datetime'], row['Batch ID'])
            conn.commit()
        except Exception as e:
            log_progress("Log Saving", f"Failed during saving logs to database: {e}")
            raise
    cur.close()
    conn.close()

if __name__ == "__main__":
    main()

2024-08-06 07:20:42 - a466f0ad-18b0-4d74-9727-6aafaf3616e8 - Extraction: World Bank Data - Starting extraction of world bank data
2024-08-06 07:20:42 - a466f0ad-18b0-4d74-9727-6aafaf3616e8 - Extraction: World Bank Data - Connecting to URL: https://en.wikipedia.org/wiki/List_of_largest_banks
2024-08-06 07:20:43 - a466f0ad-18b0-4d74-9727-6aafaf3616e8 - Extraction: World Bank Data - Successfully connected to URL
2024-08-06 07:20:43 - a466f0ad-18b0-4d74-9727-6aafaf3616e8 - Extraction: World Bank Data - Extracting table content
2024-08-06 07:20:43 - a466f0ad-18b0-4d74-9727-6aafaf3616e8 - Extraction: World Bank Data - Extracting last modified date
2024-08-06 07:20:43 - a466f0ad-18b0-4d74-9727-6aafaf3616e8 - Extraction: World Bank Data - Completed extraction of world bank data
2024-08-06 07:20:43 - a466f0ad-18b0-4d74-9727-6aafaf3616e8 - Extraction: Exchange Rates Data - Starting extraction of exchange rates data
2024-08-06 07:20:43 - a466f0ad-18b0-4d74-9727-6aafaf3616e8 - Extraction: Exchange