In [None]:
# Banking Data Analysis — PySpark notebook
# Author: ChatGPT (adapt for your environment)
# Usage: Run in PySpark environment (pyspark shell, Databricks, EMR, or local with pyspark installed).
# Assumptions:
#  - exchange_rates.csv exists locally or at specified path (columns: currency, rate_to_USD).
#  - AWS credentials configured via environment / IAM role if using S3.
#  - Python packages: pandas, matplotlib, seaborn, sqlite3 (standard), sqlalchemy (optional).

# ---------- 0. Imports & SparkSession ----------
from pyspark.sql import SparkSession, functions as F, types as T
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sqlite3
from sqlalchemy import create_engine   # optional, for SQLite write via pandas

# overall comment:
# Create SparkSession. For S3 writes, ensure hadoop-aws / aws Java libs present when using s3a://
spark = SparkSession.builder \
    .appName("BankingDataAnalysis") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

# ---------- 1. Data Acquisition (scrape archived Wikipedia) ----------
# We'll use pandas to read the archived wiki HTML table and then convert to Spark DataFrame.
wiki_url = "https://web.archive.org/web/20230908091635/https://en.wikipedia.org/wiki/List_of_largest_banks"

# overall comment:
# Pandas' read_html is convenient to pull the table. If network disabled, download the HTML manually and point to local file.
tables = pd.read_html(wiki_url)  # may return multiple tables

# Inspect and pick the correct table
# overall comment: print shapes to choose the right table (usually the 'Market cap' table).
for i, t in enumerate(tables):
    print(i, t.shape, t.columns.tolist())

# Assume the correct table is the first or find by column name
# Example: columns might be ['Rank', 'Bank', 'Market capitalization (USD billion)', ...]
# We'll attempt to locate the table containing 'Market' in column names:
table_idx = None
for i, t in enumerate(tables):
    cols = " ".join([str(c).lower() for c in t.columns])
    if "market" in cols or "market cap" in cols or "market capitalization" in cols:
        table_idx = i
        break
if table_idx is None:
    # fallback
    table_idx = 0

df_raw = tables[table_idx].copy()
print("Using table index:", table_idx)
df_raw.head(5)

# ---------- 2. Quick pandas cleaning to normalize column names ----------
# Normalize column names
df_raw.columns = [str(c).strip() for c in df_raw.columns]
# Try to standardize to: Rank, Bank Name, Market Cap (USD Billion)
# We'll attempt to find the right market-cap column by name fuzzy match
market_col = None
for c in df_raw.columns:
    lc = c.lower()
    if "market" in lc and ("usd" in lc or "cap" in lc or "capital" in lc):
        market_col = c
        break
if market_col is None:
    # fallback to last column
    market_col = df_raw.columns[-1]

# rename columns
renames = {
    df_raw.columns[0]: "Rank",
    df_raw.columns[1]: "Bank Name",
    market_col: "MarketCap_USD_Billion"
}
df = df_raw.rename(columns=renames)
df = df[["Rank", "Bank Name", "MarketCap_USD_Billion"]]

# ---------- 3. Data Preparation & Cleaning (pandas) ----------
# Overall approach:
# - Remove rows where Bank Name or MarketCap is missing.
# - Clean MarketCap column: remove commas, footnotes, non-numeric chars, convert to float (billions).
# - Convert Rank to integer where possible.

def clean_market_value(x):
    if pd.isna(x):
        return np.nan
    s = str(x)
    # remove references like [1], commas, dollar signs, parentheses text
    import re
    s = re.sub(r"\[.*?\]", "", s)           # remove bracketed footnotes
    s = s.replace(",", "")
    s = s.replace("US$", "")
    s = s.replace("$", "")
    s = s.strip()
    # Sometimes market cap has '—' or 'N/A' or ranges; handle gracefully
    if s in ["—", "-", "N/A", ""]:
        return np.nan
    # extract first numeric token
    m = re.search(r"[-+]?\d*\.?\d+", s)
    if m:
        try:
            return float(m.group())
        except:
            return np.nan
    return np.nan

df["MarketCap_USD_Billion"] = df["MarketCap_USD_Billion"].apply(clean_market_value)
# Rank cleaning
def clean_rank(x):
    try:
        if pd.isna(x):
            return None
        s = str(x).strip()
        s = s.split()[0]
        return int(s)
    except:
        return None

df["Rank"] = df["Rank"].apply(clean_rank)
# Drop rows missing bank name or marketcap
df = df.dropna(subset=["Bank Name", "MarketCap_USD_Billion"])
df = df.reset_index(drop=True)
print("Rows after cleaning:", df.shape[0])
df.head()

# ---------- 4. Move to Spark DataFrame ----------
spark_df = spark.createDataFrame(df)
# enforce schema and types
schema_df = spark_df.withColumn("Rank", F.col("Rank").cast(T.IntegerType())) \
    .withColumn("MarketCap_USD_Billion", F.col("MarketCap_USD_Billion").cast(T.DoubleType())) \
    .withColumn("BankName", F.col("Bank Name")).drop("Bank Name")

# reorder
spark_df = schema_df.select("Rank", "BankName", "MarketCap_USD_Billion")
spark_df.printSchema()
spark_df.show(5, truncate=False)

# ---------- 5. Handling Missing Values (report) ----------
# Approach for missing values:
# - small number of missing Rank -> leave as NULL or recompute ranking based on MarketCap.
# - missing MarketCap -> drop (can't analyze) or mark as unknown. Here we dropped rows missing market cap.
missing_counts = {c: spark_df.filter(F.col(c).isNull()).count() for c in spark_df.columns}
print("Missing counts:", missing_counts)

# ---------- 6. Outliers — identification ----------
# We'll compute z-scores or IQR in Spark (convert to pandas to compute robustly)
pd_df_for_stats = spark_df.select("MarketCap_USD_Billion").toPandas()
q1 = pd_df_for_stats["MarketCap_USD_Billion"].quantile(0.25)
q3 = pd_df_for_stats["MarketCap_USD_Billion"].quantile(0.75)
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
print("IQR outlier bounds:", lower, upper)
# Outliers are banks above 'upper' (likely very large global banks). We'll keep them but note them.

# ---------- 7. Exchange rates (load CSV) ----------
# exchange_rates.csv format assumption:
# currency, rate_to_USD
# e.g. USD, 1.0; GBP, 1.23; EUR, 1.07; INR, 0.012 (example: 1 INR = 0.012 USD)
exchange_rates_csv_path = "./exchange_rates.csv"
try:
    exch_pd = pd.read_csv(exchange_rates_csv_path)
except Exception as e:
    print("Failed to read exchange_rates.csv:", e)
    # fallback sample rates (you should replace with actual CSV)
    exch_pd = pd.DataFrame({
        "currency": ["USD", "GBP", "EUR", "INR"],
        "rate_to_USD": [1.0, 1.25, 1.10, 0.012]   # example: 1 INR = 0.012 USD
    })
exch_pd

# Convert to mapping: USD to other currencies -> compute multiplier to convert USD -> target currency
# If rate_to_USD is how much 1 target currency equals USD, we want USD -> target = 1 / rate_to_USD
# But naming varies; assume rate_to_USD means 1 unit of currency = rate_to_USD USD
rate_map = {}
for _, r in exch_pd.iterrows():
    cur = r['currency'].strip().upper()
    to_usd = float(r['rate_to_USD'])
    if to_usd == 0:
        rate_map[cur] = None
    else:
        rate_map[cur] = 1.0 / to_usd  # multiply USD value by this to get currency amount
rate_map

# ---------- 8. Add converted columns to Spark DF ----------
# MarketCap_USD_Billion is in USD billions. We'll compute amounts in target currencies (billions).
def add_currency_columns(sdf, rate_map):
    sdf2 = sdf
    for cur, mult in rate_map.items():
        if mult is None:
            continue
        colname = f"MarketCap_{cur}_Billion"
        # multiply USD value by conversion multiplier
        sdf2 = sdf2.withColumn(colname, F.col("MarketCap_USD_Billion") * F.lit(mult))
    return sdf2

spark_df = add_currency_columns(spark_df, rate_map)
spark_df.show(5, truncate=False)

# ---------- 9. EDA — convert to pandas for plotting ----------
pandas_df = spark_df.toPandas().sort_values(by="MarketCap_USD_Billion", ascending=False).reset_index(drop=True)

# 3.2 Distribution histogram
plt.figure(figsize=(8,4))
sns.histplot(pandas_df["MarketCap_USD_Billion"], bins=20, kde=True)
plt.title("Distribution of Market Cap (USD Billion)")
plt.xlabel("MarketCap (USD Billion)")
plt.ylabel("Count")
plt.tight_layout()
plt.show()

# 3.3 Top 10 banks by marketcap bar chart
top10 = pandas_df.head(10).copy()
plt.figure(figsize=(10,6))
sns.barplot(data=top10, x="MarketCap_USD_Billion", y="BankName")
plt.title("Top 10 Banks by Market Cap (USD Billion)")
plt.xlabel("MarketCap (USD Billion)")
plt.ylabel("")
plt.tight_layout()
plt.show()

# 3.4 Scatter: marketcap vs rank
plt.figure(figsize=(8,5))
sns.scatterplot(data=pandas_df, x="Rank", y="MarketCap_USD_Billion")
plt.title("Rank vs Market Cap (USD Billion)")
plt.xlabel("Rank")
plt.ylabel("MarketCap (USD Billion)")
plt.gca().invert_xaxis()  # smaller rank = top; invert if you prefer
plt.tight_layout()
plt.show()

# 3.5 Boxplot for spread & outliers
plt.figure(figsize=(6,4))
sns.boxplot(x=pandas_df["MarketCap_USD_Billion"])
plt.title("Boxplot - Market Cap (USD Billion)")
plt.tight_layout()
plt.show()

# 3.6 Violin plot quartile distribution
plt.figure(figsize=(6,4))
sns.violinplot(x=pandas_df["MarketCap_USD_Billion"], inner="quartile")
plt.title("Violin Plot - Market Cap (USD Billion)")
plt.tight_layout()
plt.show()

# 3.7 Cumulative market share line plot
pandas_df["MarketCap_USD_Billion_sorted"] = pandas_df["MarketCap_USD_Billion"].sort_values(ascending=False).values
pandas_df["CumulativeSum"] = pandas_df["MarketCap_USD_Billion_sorted"].cumsum()
total = pandas_df["MarketCap_USD_Billion"].sum()
pandas_df["CumulativeSharePct"] = pandas_df["CumulativeSum"] / total * 100

plt.figure(figsize=(8,5))
plt.plot(range(1, len(pandas_df)+1), pandas_df["CumulativeSharePct"], marker='o')
plt.title("Cumulative Market Share of Banks (%)")
plt.xlabel("Number of Banks (sorted by Market Cap descending)")
plt.ylabel("Cumulative Share (%)")
plt.grid(True)
plt.tight_layout()
plt.show()

# 3.8 Categorize banks into ranges (micro, small, mid, large, mega) — you can tune thresholds
bins = [0, 10, 50, 200, 500, 5000]  # in USD Billion
labels = ["<10B", "10-50B", "50-200B", "200-500B", ">500B"]
pandas_df["CapCategory"] = pd.cut(pandas_df["MarketCap_USD_Billion"], bins=bins, labels=labels, include_lowest=True)
cat_counts = pandas_df["CapCategory"].value_counts().reindex(labels).fillna(0)
plt.figure(figsize=(7,4))
sns.barplot(x=cat_counts.index, y=cat_counts.values)
plt.title("Distribution across Market Cap Ranges")
plt.xlabel("Market Cap Range (USD Billion)")
plt.ylabel("Number of Banks")
plt.tight_layout()
plt.show()

# 3.9 Pie chart of top 10 market share
plt.figure(figsize=(7,7))
top10 = pandas_df.head(10)
plt.pie(top10["MarketCap_USD_Billion"], labels=top10["BankName"], autopct='%1.1f%%', startangle=140)
plt.title("Market Share Distribution (Top 10 Banks)")
plt.tight_layout()
plt.show()

# ---------- 10. Advanced ETL / Querying tasks (Spark SQL + DataFrame APIs) ----------
spark_df.createOrReplaceTempView("Largest_banks")

# 4.1 Advanced Market Cap Analysis with Growth Metrics (if we had historical series)
# For this dataset (single snapshot), we can create "growth-like" metrics relative to next/lower bank:
# Example: pct_diff_to_next = (marketcap - next_marketcap)/next_marketcap
df_window = spark.sql("""
SELECT
  *,
  LEAD(MarketCap_USD_Billion) OVER (ORDER BY MarketCap_USD_Billion DESC) as Next_MarketCap_USD_Billion
FROM Largest_banks
ORDER BY MarketCap_USD_Billion DESC
""")
df_growth = df_window.withColumn(
    "PctDiff_to_Next",
    F.when(F.col("Next_MarketCap_USD_Billion").isNotNull(),
           (F.col("MarketCap_USD_Billion") - F.col("Next_MarketCap_USD_Billion")) / F.col("Next_MarketCap_USD_Billion") * 100
          ).otherwise(F.lit(None))
)
df_growth.select("Rank", "BankName", "MarketCap_USD_Billion", "Next_MarketCap_USD_Billion", "PctDiff_to_Next").show(20, truncate=False)

# 4.2 Market Concentration & Categorize based on Market Share Tiers
# compute each bank's share
total_marketcap = spark_df.agg(F.sum("MarketCap_USD_Billion").alias("total")).collect()[0]["total"]
market_share_df = spark_df.withColumn("MarketSharePct", F.col("MarketCap_USD_Billion")/F.lit(total_marketcap)*100)
# define tiering
market_share_df = market_share_df.withColumn(
    "Tier",
    F.when(F.col("MarketSharePct") >= 10, F.lit("Mega (>10%)"))
     .when(F.col("MarketSharePct") >= 5, F.lit("Large (5-10%)"))
     .when(F.col("MarketSharePct") >= 1, F.lit("Mid (1-5%)"))
     .otherwise(F.lit("Small (<1%)"))
)
market_share_df.select("Rank", "BankName", "MarketCap_USD_Billion", "MarketSharePct", "Tier").orderBy(F.desc("MarketCap_USD_Billion")).show(20, truncate=False)

# 4.3 Quartile analysis (compute quartiles & assign quartile)
quantiles = spark_df.approxQuantile("MarketCap_USD_Billion", [0.25, 0.5, 0.75], 0.01)
q1, q2, q3 = quantiles
print("Quartiles (USD Billion):", q1, q2, q3)
quartile_df = spark_df.withColumn(
    "Quartile",
    F.when(F.col("MarketCap_USD_Billion") <= q1, F.lit("Q1"))
     .when((F.col("MarketCap_USD_Billion") > q1) & (F.col("MarketCap_USD_Billion") <= q2), F.lit("Q2"))
     .when((F.col("MarketCap_USD_Billion") > q2) & (F.col("MarketCap_USD_Billion") <= q3), F.lit("Q3"))
     .otherwise(F.lit("Q4"))
)
quartile_df.select("Rank", "BankName", "MarketCap_USD_Billion", "Quartile").orderBy(F.desc("MarketCap_USD_Billion")).show(20, truncate=False)

# 4.4 Comparative size analysis (classify relative market size vs median)
median_val = q2
size_df = spark_df.withColumn(
    "RelativeSize",
    F.when(F.col("MarketCap_USD_Billion") >= median_val * 2, F.lit("Much Larger"))
     .when(F.col("MarketCap_USD_Billion") >= median_val * 1.1, F.lit("Larger"))
     .when(F.col("MarketCap_USD_Billion") >= median_val * 0.9, F.lit("Similar"))
     .when(F.col("MarketCap_USD_Billion") >= median_val * 0.5, F.lit("Smaller"))
     .otherwise(F.lit("Much Smaller"))
)
size_df.select("Rank", "BankName", "MarketCap_USD_Billion", "RelativeSize").orderBy(F.desc("MarketCap_USD_Billion")).show(20, truncate=False)

# 4.5 Evaluate Market Growth & Gaps between consecutive banks (we already computed PctDiff_to_Next)
df_growth.select("Rank", "BankName", "MarketCap_USD_Billion", "Next_MarketCap_USD_Billion", "PctDiff_to_Next").orderBy(F.desc("PctDiff_to_Next")).show(20, truncate=False)

# 4.6 Market Dominance score: e.g., dominance = market_share * log(marketcap) or Herfindahl-like
# Simple dominance: market_share_pct * log10(marketcap + 1)
market_dominance_df = market_share_df.withColumn("DominanceScore", F.col("MarketSharePct") * F.log10(F.col("MarketCap_USD_Billion") + F.lit(1)))
market_dominance_df.select("Rank", "BankName", "MarketCap_USD_Billion", "MarketSharePct", "DominanceScore").orderBy(F.desc("DominanceScore")).show(20, truncate=False)

# 4.7 Segment-wise performance (by CapCategory previously computed in pandas; we can recompute)
# Convert bins to Spark by using percentile-based buckets or the same bins
bins_broadcast = bins
# We'll join pandas category back for simplicity:
pandas_df_for_join = pandas_df[["BankName", "CapCategory"]]
spark_cat_df = spark.createDataFrame(pandas_df_for_join)
spark_full = spark_df.join(spark_cat_df, on="BankName", how="left")
spark_full.groupBy("CapCategory").agg(
    F.count("*").alias("Count"),
    F.sum("MarketCap_USD_Billion").alias("TotalMarketCap"),
    F.avg("MarketCap_USD_Billion").alias("AvgMarketCap")
).orderBy("CapCategory").show(truncate=False)

# 4.8 Create dashboard-ready aggregated table (top metrics)
dashboard_df = market_share_df.select(
    "Rank", "BankName", "MarketCap_USD_Billion", "MarketSharePct", "Tier"
).orderBy(F.desc("MarketCap_USD_Billion"))
dashboard_df.show(20, truncate=False)

# ---------- 11. Save outputs (CSV and SQLite) ----------
output_folder_local = "./output"
import os
os.makedirs(output_folder_local, exist_ok=True)

# Save CSV from Spark
csv_out_path = os.path.join(output_folder_local, "largest_banks_processed.csv")
spark_df.toPandas().to_csv(csv_out_path, index=False)
print("Saved CSV to", csv_out_path)

# Save to SQLite (single table)
sqlite_path = os.path.join(output_folder_local, "largest_banks.db")
engine = create_engine(f"sqlite:///{sqlite_path}")
# convert spark df to pandas then to sqlite
spark_df.toPandas().to_sql("Largest_banks", con=engine, if_exists="replace", index=False)
print("Saved SQLite DB to", sqlite_path)

# ---------- 12. Upload to S3 (if desired) ----------
# Preferred: use IAM role on cluster, or AWS CLI-configured credentials in environment. Example using boto3:
import boto3
s3 = boto3.client("s3")   # will use env credentials or role
bucket_name = "your-s3-bucket-name"   # replace
s3_key = "banking-analysis/largest_banks_processed.csv"
try:
    s3.upload_file(csv_out_path, bucket_name, s3_key)
    print(f"Uploaded to s3://{bucket_name}/{s3_key}")
except Exception as e:
    print("S3 upload skipped or failed (configure credentials/role). Error:", e)

# If you prefer Spark write to S3 (s3a://), configure hadoop properties and write:
# spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "AKIA...")
# spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "SECRET")
# spark_df.write.option("header", True).csv("s3a://your-bucket/path/largest_banks_processed.csv", mode="overwrite")

# ---------- 13. Save summary metrics to a small JSON (optional) ----------
summary = {
    "total_banks": int(spark_df.count()),
    "total_marketcap_usd_billion": float(spark_df.agg(F.sum("MarketCap_USD_Billion")).collect()[0][0]),
    "top_bank": spark_df.orderBy(F.desc("MarketCap_USD_Billion")).select("BankName").first()[0]
}
import json
with open(os.path.join(output_folder_local, "summary.json"), "w") as f:
    json.dump(summary, f, indent=2)
print("Saved summary.json")

# ---------- 14. Final notes ----------
# Stop SparkSession if done
# spark.stop()