# ETL Pipeline: World’s Largest Banks Data Processing
### IBM Data Engineering Specialization – Portfolio Project
Objective: Build an ETL pipeline to extract top-10 banks by market cap (USD), convert to GBP/EUR/INR using exchange_rate.csv, save results to CSV and DB, and maintain a code log.


In [16]:
!python3 -m pip install --quiet requests beautifulsoup4 pandas sqlalchemy pymysql
!wget -q -O exchange_rate.csv "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-PY0221EN-Coursera/labs/v2/exchange_rate.csv"
!ls -l exchange_rate.csv


-rw-r--r-- 1 root root 45 Sep  8  2023 exchange_rate.csv


**Preliminaries:** We use requests, BeautifulSoup (bs4), pandas, numpy, sqlite3 and datetime.

In [17]:
# Imports and project constants
import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import sqlite3
from datetime import datetime
import os

# Project constants
ARCHIVE_BANKS_URL = "https://web.archive.org/web/20230908091635/https://en.wikipedia.org/wiki/List_of_largest_banks"
EXCHANGE_RATE_CSV = "exchange_rate.csv"
OUTPUT_CSV = "Largest_banks_data.csv"
SQLITE_DB = "Banks.db"
TABLE_NAME = "Largest_banks"
LOG_FILE = "code_log.txt"

LOG_MESSAGES = {
    "init": "Preliminaries complete. Initiating ETL process",
    "extracted": "Data extraction complete. Initiating Transformation process",
    "transformed": "Data transformation complete. Initiating Loading process",
    "csv_saved": "Data saved to CSV file",
    "sql_conn": "SQL Connection initiated",
    "db_loaded": "Data loaded to Database as a table, Executing queries",
    "process_complete": "Process Complete",
    "sql_closed": "Server Connection closed",
}


**Task 1 — Logging**  
Format: `<time_stamp> : <message>` (each entry on a new line). We'll call this function at required stages.

In [18]:
def log_progress(message, log_file=LOG_FILE):
    """Append a timestamped message to the log file using format: <time_stamp> : <message>"""
    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    entry = f"{ts} : {message}\n"
    with open(log_file, "a", encoding="utf-8") as f:
        f.write(entry)


**Task 2 — Extraction**  
Goal: find table under heading "By market capitalization", extract Name and Market Cap (in USD, billions), clean values (remove trailing characters, commas, footnotes), convert to float, keep top 10 by MC_USD_Billion.


In [19]:
def extract(url=ARCHIVE_BANKS_URL):
    """Extract Name and MC_USD_Billion from the archived Wikipedia page and return top-10 dataframe."""
    resp = requests.get(url)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")

    # Locate the 'By market capitalization' heading then next table
    header = soup.find(lambda tag: tag.name in ["h2", "h3"] and "By market capitalization" in tag.get_text())
    if header:
        table = header.find_next("table")
    else:
        # fallback: pick first table that looks like market caps
        tables = soup.find_all("table")
        table = None
        for t in tables:
            text = t.get_text()
            if "Market cap" in text or "Market capitalization" in text or "Market" in text:
                table = t
                break
        if table is None:
            raise ValueError("Required table not found on page")

    # Parse table with pandas
    df_raw = pd.read_html(str(table))[0]
    df_raw.columns = [str(c).strip() for c in df_raw.columns]

    # Heuristics to pick name and market cap columns
    name_col = None
    mc_col = None
    for c in df_raw.columns:
        low = c.lower()
        if name_col is None and ("name" in low or "bank" in low):
            name_col = c
        if mc_col is None and ("market" in low or "market cap" in low or "mc" in low):
            mc_col = c

    if name_col is None:
        name_col = df_raw.columns[0]
    if mc_col is None:
        mc_col = df_raw.columns[-1]

    df = df_raw[[name_col, mc_col]].copy()
    df.columns = ["Name", "MC_USD_Billion"]

    # cleaning function: remove commas, footnotes and extract first numeric token
    import re
    def clean_mc(x):
        if pd.isna(x):
            return np.nan
        s = str(x).split("[")[0]   # remove bracketed footnotes
        s = s.replace(",", "").replace("billion", "").strip()
        m = re.search(r"[-+]?[0-9]*\.?[0-9]+", s)
        return float(m.group(0)) if m else np.nan

    df["MC_USD_Billion"] = df["MC_USD_Billion"].apply(clean_mc)
    df = df.dropna(subset=["MC_USD_Billion"]).reset_index(drop=True)
    df = df.sort_values("MC_USD_Billion", ascending=False).head(10).reset_index(drop=True)
    return df


In [20]:
# Make first log entry (Preliminaries complete)
if os.path.exists(LOG_FILE):
    pass
log_progress(LOG_MESSAGES["init"])

# Extract and preview
df_extracted = extract()
print(df_extracted)
log_progress(LOG_MESSAGES["extracted"])


                                      Name  MC_USD_Billion
0                           JPMorgan Chase          432.92
1                          Bank of America          231.52
2  Industrial and Commercial Bank of China          194.56
3               Agricultural Bank of China          160.68
4                                HDFC Bank          157.91
5                              Wells Fargo          155.87
6                        HSBC Holdings PLC          148.90
7                           Morgan Stanley          140.83
8                  China Construction Bank          139.82
9                            Bank of China          136.81


  df_raw = pd.read_html(str(table))[0]


**Task 3 — Transformation**  
Read `exchange_rate.csv` and convert to a dict. Add `MC_GBP_Billion`, `MC_EUR_Billion`, `MC_INR_Billion` = USD * rate (rounded to 2 decimals). Then return final df with columns: Name, MC_USD_Billion, MC_GBP_Billion, MC_EUR_Billion, MC_INR_Billion.

In [21]:
def transform(df, csv_path=EXCHANGE_RATE_CSV):
    """Add MC_GBP_Billion, MC_EUR_Billion, MC_INR_Billion to df using exchange_rate.csv"""
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"{csv_path} not found. Download it and re-run the cell.")

    # Try reading with or without header
    ex = pd.read_csv(csv_path, header=None)
    # If file has header row with strings, try again with headers
    if ex.shape[1] >= 2 and ex.iloc[0].dtype == object:
        try:
            ex = pd.read_csv(csv_path)
        except Exception:
            ex = pd.read_csv(csv_path, header=None)

    keys = ex.iloc[:, 0].astype(str).str.strip().tolist()
    vals = pd.to_numeric(ex.iloc[:, 1], errors="coerce").tolist()
    exchange_rate = dict(zip(keys, vals))

    # required currencies
    for c in ["GBP", "EUR", "INR"]:
        if c not in exchange_rate:
            raise KeyError(f"Exchange rate for {c} not found in {csv_path}")

    df2 = df.copy()
    df2["MC_GBP_Billion"] = [np.round(x * exchange_rate["GBP"], 2) for x in df2["MC_USD_Billion"]]
    df2["MC_EUR_Billion"] = [np.round(x * exchange_rate["EUR"], 2) for x in df2["MC_USD_Billion"]]
    df2["MC_INR_Billion"] = [np.round(x * exchange_rate["INR"], 2) for x in df2["MC_USD_Billion"]]

    # reorder final columns as required
    df2 = df2[["Name", "MC_USD_Billion", "MC_GBP_Billion", "MC_EUR_Billion", "MC_INR_Billion"]]
    return df2

In [22]:
df_transformed = transform(df_extracted, EXCHANGE_RATE_CSV)
print(df_transformed)
log_progress(LOG_MESSAGES["transformed"])


                                      Name  MC_USD_Billion  MC_GBP_Billion  \
0                           JPMorgan Chase          432.92          346.34   
1                          Bank of America          231.52          185.22   
2  Industrial and Commercial Bank of China          194.56          155.65   
3               Agricultural Bank of China          160.68          128.54   
4                                HDFC Bank          157.91          126.33   
5                              Wells Fargo          155.87          124.70   
6                        HSBC Holdings PLC          148.90          119.12   
7                           Morgan Stanley          140.83          112.66   
8                  China Construction Bank          139.82          111.86   
9                            Bank of China          136.81          109.45   

   MC_EUR_Billion  MC_INR_Billion  
0          402.62        35910.71  
1          215.31        19204.58  
2          180.94        16138.75

**Task 4 — Save CSV**  
Save final DataFrame to `./Largest_banks_data.csv` and log the completion message.


In [23]:
def load_to_csv(df, output_path=OUTPUT_CSV):
    df.to_csv(output_path, index=False)
    log_progress(LOG_MESSAGES["csv_saved"])

# save CSV and preview first lines
load_to_csv(df_transformed, OUTPUT_CSV)
!head -n 12 Largest_banks_data.csv


Name,MC_USD_Billion,MC_GBP_Billion,MC_EUR_Billion,MC_INR_Billion
JPMorgan Chase,432.92,346.34,402.62,35910.71
Bank of America,231.52,185.22,215.31,19204.58
Industrial and Commercial Bank of China,194.56,155.65,180.94,16138.75
Agricultural Bank of China,160.68,128.54,149.43,13328.41
HDFC Bank,157.91,126.33,146.86,13098.63
Wells Fargo,155.87,124.7,144.96,12929.42
HSBC Holdings PLC,148.9,119.12,138.48,12351.26
Morgan Stanley,140.83,112.66,130.97,11681.85
China Construction Bank,139.82,111.86,130.03,11598.07
Bank of China,136.81,109.45,127.23,11348.39


**Task 5 — Save to DB**  
We will use SQLite (`Banks.db`). Create connection, log SQL connection initiated, load table `Largest_banks`.


In [24]:
def load_to_db(df, sql_connection, table_name=TABLE_NAME):
    """Load dataframe into given sqlite3.Connection (or raise if not sqlite)."""
    if isinstance(sql_connection, sqlite3.Connection):
        df.to_sql(table_name, sql_connection, if_exists="replace", index=False)
    else:
        raise ValueError("This Colab flow expects a sqlite3.Connection.")
    log_progress(LOG_MESSAGES["db_loaded"])

# Connect to SQLite and load
conn = sqlite3.connect(SQLITE_DB)
log_progress(LOG_MESSAGES["sql_conn"])
load_to_db(df_transformed, conn, TABLE_NAME)


**Task 6 — Run queries**  
Run and print outputs for:
1. `SELECT * FROM Largest_banks`  
2. `SELECT AVG(MC_GBP_Billion) FROM Largest_banks`  
3. `SELECT Name FROM Largest_banks LIMIT 5`


In [25]:
def run_query(query_statement, sql_connection):
    """Execute query on sqlite3.Connection and print results."""
    print("Query:", query_statement)
    df_out = pd.read_sql_query(query_statement, sql_connection)
    print(df_out)
    print("-" * 40)

# Execute required queries
run_query(f"SELECT * FROM {TABLE_NAME}", conn)
run_query(f"SELECT AVG(MC_GBP_Billion) FROM {TABLE_NAME}", conn)
run_query(f"SELECT Name FROM {TABLE_NAME} LIMIT 5", conn)

log_progress(LOG_MESSAGES["process_complete"])


Query: SELECT * FROM Largest_banks
                                      Name  MC_USD_Billion  MC_GBP_Billion  \
0                           JPMorgan Chase          432.92          346.34   
1                          Bank of America          231.52          185.22   
2  Industrial and Commercial Bank of China          194.56          155.65   
3               Agricultural Bank of China          160.68          128.54   
4                                HDFC Bank          157.91          126.33   
5                              Wells Fargo          155.87          124.70   
6                        HSBC Holdings PLC          148.90          119.12   
7                           Morgan Stanley          140.83          112.66   
8                  China Construction Bank          139.82          111.86   
9                            Bank of China          136.81          109.45   

   MC_EUR_Billion  MC_INR_Billion  
0          402.62        35910.71  
1          215.31        19204.58 

In [13]:
conn.close()
log_progress(LOG_MESSAGES["sql_closed"])

# show produced files for verification
!ls -l | grep -E "Largest_banks_data.csv|Banks.db|code_log.txt" || true

-rw-r--r-- 1 root root 8192 Dec 11 07:22 Banks.db
-rw-r--r-- 1 root root  531 Dec 11 07:28 code_log.txt
-rw-r--r-- 1 root root  554 Dec 11 07:18 Largest_banks_data.csv


In [26]:
# Display the log file content for Task 7 verification
print("---- code_log.txt ----")
if os.path.exists(LOG_FILE):
    with open(LOG_FILE, "r", encoding="utf-8") as f:
        print(f.read())
else:
    print("code_log.txt not found. If you re-run notebook after removing previous logs, the file will be created.")


---- code_log.txt ----
2025-12-11 07:35:01 : Preliminaries complete. Initiating ETL process
2025-12-11 07:35:01 : Data extraction complete. Initiating Transformation process
2025-12-11 07:35:13 : Data transformation complete. Initiating Loading process
2025-12-11 07:35:17 : Data saved to CSV file
2025-12-11 07:35:20 : SQL Connection initiated
2025-12-11 07:35:20 : Data loaded to Database as a table, Executing queries
2025-12-11 07:35:25 : Process Complete



## Conclusion

This ETL pipeline demonstrates my ability to perform real-world data engineering tasks, including web scraping, data cleaning, transformation using external data sources, structured logging, and loading results into both CSV and SQL databases. This project is completed as part of the IBM Data Engineering Specialization and is included in my portfolio to showcase practical ETL development skills.
