In [0]:
%pip install requests pandas

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:

%restart_python
# If the above magic doesn't work in your environment, try:
# dbutils.library.restartPython()


In [0]:

# Use and create the default schema in the current catalog
spark.sql("CREATE SCHEMA IF NOT EXISTS default")
spark.sql("USE SCHEMA default")


DataFrame[]

In [0]:

# # 4.1 Data Ingestion
# Import required libraries
import requests
import pandas as pd

# Parameterize the date range
start_date = "2024-01-01"
end_date   = "2024-12-31"
base_ccy   = "EUR"

# Build API URL
url = f"https://api.frankfurter.app/{start_date}..{end_date}?from={base_ccy}"

# Call the API
resp = requests.get(url, timeout=30)
resp.raise_for_status()
payload = resp.json()

# Flatten JSON to rows: one row per (date, currency, rate)
rows = []
for d, rate_map in (payload.get("rates") or {}).items():
    for ccy, rate in rate_map.items():
        rows.append({
            "date": d,             # string YYYY-MM-DD
            "base": payload.get("base"),
            "currency": ccy,
            "rate": float(rate)
        })

# Convert to pandas then to Spark DataFrame
pdf_fx = pd.DataFrame(rows)
df_fx = spark.createDataFrame(pdf_fx)

# Quick checks
print(f"fx rows: {df_fx.count()}")
df_fx.printSchema()
df_fx.show(5)


fx rows: 7710
root
 |-- date: string (nullable = true)
 |-- base: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- rate: double (nullable = true)

+----------+----+--------+------+
|      date|base|currency|  rate|
+----------+----+--------+------+
|2023-12-29| EUR|     AUD|1.6263|
|2023-12-29| EUR|     BGN|1.9558|
|2023-12-29| EUR|     BRL|5.3618|
|2023-12-29| EUR|     CAD|1.4642|
|2023-12-29| EUR|     CHF| 0.926|
+----------+----+--------+------+
only showing top 5 rows


In [0]:

# 4.1 Data Ingestion
# Import libraries
import requests
import pandas as pd

# API endpoint: limit to required fields for efficiency
countries_url = "https://restcountries.com/v3.1/all?fields=name,population,region,cca2,currencies"

# Call the API
resp = requests.get(countries_url, timeout=60)
resp.raise_for_status()
countries = resp.json()

# Normalize to tabular rows: expand one row per currency per country
rows = []
for c in countries:
    name = (c.get("name") or {}).get("common")
    population = c.get("population")
    region = c.get("region")
    cca2 = c.get("cca2")
    currencies = c.get("currencies") or {}

    # If a country has currencies, expand each; else keep one row with None currency
    if currencies:
        for code, info in currencies.items():
            rows.append({
                "country": name,
                "cca2": cca2,
                "region": region,
                "population": population,
                "currency": code,
                "currency_name": (info or {}).get("name"),
            })
    else:
        rows.append({
            "country": name,
            "cca2": cca2,
            "region": region,
            "population": population,
            "currency": None,
            "currency_name": None,
        })

# Convert to pandas then to Spark DataFrame
pdf_cty = pd.DataFrame(rows)
df_cty = spark.createDataFrame(pdf_cty)

# Quick checks
print(f"countries rows: {df_cty.count()}")
df_cty.printSchema()
df_cty.show(5)


countries rows: 271
root
 |-- country: string (nullable = true)
 |-- cca2: string (nullable = true)
 |-- region: string (nullable = true)
 |-- population: long (nullable = true)
 |-- currency: string (nullable = true)
 |-- currency_name: string (nullable = true)

+-------------------+----+--------+----------+--------+--------------------+
|            country|cca2|  region|population|currency|       currency_name|
+-------------------+----+--------+----------+--------+--------------------+
|Antigua and Barbuda|  AG|Americas|    103603|     XCD|Eastern Caribbean...|
|             Bhutan|  BT|    Asia|    784043|     BTN|  Bhutanese ngultrum|
|             Bhutan|  BT|    Asia|    784043|     INR|        Indian rupee|
|              Italy|  IT|  Europe|  58927633|     EUR|                Euro|
|             Tuvalu|  TV| Oceania|     10643|     AUD|   Australian dollar|
+-------------------+----+--------+----------+--------+--------------------+
only showing top 5 rows


In [0]:

# 4.2 Data Cleaning,Preparation and analysis
# Import Spark functions
from pyspark.sql.functions import col

# Clean FX data: drop nulls and cast rate
df_fx_clean = (
    df_fx
    .dropna(subset=["date", "currency", "rate"])
    .withColumn("rate", col("rate").cast("double"))
)

# Clean country data: ensure core columns and cast population
df_cty_clean = (
    df_cty
    .dropna(subset=["country"])                # core identity column
    .withColumn("population", col("population").cast("long"))
)

# Sanity check
print("fx_clean rows:", df_fx_clean.count())
print("cty_clean rows:", df_cty_clean.count())


fx_clean rows: 7710
cty_clean rows: 271


In [0]:

# 4.3 Data Storage
# Persist cleaned dataframes as Delta managed tables
df_fx_clean.write.mode("overwrite").format("delta").saveAsTable("fx_rates_eur")
df_cty_clean.write.mode("overwrite").format("delta").saveAsTable("countries_currency")

print("Tables saved: fx_rates_eur, countries_currency")


Tables saved: fx_rates_eur, countries_currency


In [0]:

# 4.3 Data Storage
# Load tables back to verify
fx_tbl = spark.table("fx_rates_eur")
cty_tbl = spark.table("countries_currency")

fx_tbl.show(3)
cty_tbl.show(3)


+----------+----+--------+------+
|      date|base|currency|  rate|
+----------+----+--------+------+
|2024-04-02| EUR|     DKK|7.4582|
|2024-04-02| EUR|     GBP|0.8551|
|2024-04-02| EUR|     HKD|8.4148|
+----------+----+--------+------+
only showing top 3 rows
+-------------------+----+--------+----------+--------+--------------------+
|            country|cca2|  region|population|currency|       currency_name|
+-------------------+----+--------+----------+--------+--------------------+
|Antigua and Barbuda|  AG|Americas|    103603|     XCD|Eastern Caribbean...|
|             Bhutan|  BT|    Asia|    784043|     BTN|  Bhutanese ngultrum|
|             Bhutan|  BT|    Asia|    784043|     INR|        Indian rupee|
+-------------------+----+--------+----------+--------+--------------------+
only showing top 3 rows


In [0]:
# 4.4 SQL Analysis / Query 1: Monthly Average EUR→USD Exchange Rate

%sql

SELECT
  date_format(to_date(date), 'yyyy-MM') AS month,
  AVG(rate) AS avg_rate_usd
FROM fx_rates_eur
WHERE currency = 'USD'
GROUP BY date_format(to_date(date), 'yyyy-MM')
ORDER BY month;


month,avg_rate_usd
2023-12,1.105
2024-01,1.0905136363636363
2024-02,1.0794714285714286
2024-03,1.08722
2024-04,1.0727761904761903
2024-05,1.081222727272727
2024-06,1.0759
2024-07,1.084408695652174
2024-08,1.101218181818182
2024-09,1.1105999999999998


In [0]:
# 4.4 SQL Analysis / Query 2: Top 10 Most Populous Countries by Region

%sql
WITH ranked AS (
  SELECT
    region,
    country,
    population,
    ROW_NUMBER() OVER (PARTITION BY region ORDER BY population DESC) AS rn
  FROM countries_currency
  WHERE population IS NOT NULL
)
SELECT
  region,
  country,
  population
FROM ranked
WHERE rn <= 10
ORDER BY region, population DESC;


region,country,population
Africa,Nigeria,223800000
Africa,DR Congo,112832000
Africa,Ethiopia,111652998
Africa,Egypt,107271260
Africa,Tanzania,68153004
Africa,South Africa,63100945
Africa,Kenya,53330978
Africa,Sudan,51662000
Africa,Algeria,47400000
Africa,Uganda,45905417


In [0]:
# 4.4 SQL Analysis / Query 3: Month-over-Month Change of Monthly Average EUR→USD Exchange Rate

%sql

WITH m AS (
  SELECT
    date_format(to_date(date), 'yyyy-MM') AS month,
    AVG(rate) AS avg_usd
  FROM fx_rates_eur
  WHERE currency = 'USD'
  GROUP BY date_format(to_date(date), 'yyyy-MM')
)
SELECT
  month,
  avg_usd,
  avg_usd - LAG(avg_usd) OVER (ORDER BY month) AS m2m_change
FROM m
ORDER BY month;


month,avg_usd,m2m_change
2023-12,1.105,
2024-01,1.0905136363636363,-0.0144863636363634
2024-02,1.0794714285714286,-0.0110422077922078
2024-03,1.08722,0.0077485714285712
2024-04,1.0727761904761903,-0.0144438095238093
2024-05,1.081222727272727,0.0084465367965365
2024-06,1.0759,-0.0053227272727269
2024-07,1.084408695652174,0.0085086956521738
2024-08,1.101218181818182,0.0168094861660079
2024-09,1.1105999999999998,0.0093818181818179


In [0]:
# 4.4 SQL Analysis / Query 4: Link Each Country’s Currency to Its Average Exchange Rate
 
%sql

WITH avg_fx AS (
  SELECT
    currency,
    AVG(rate) AS avg_rate_vs_eur
  FROM fx_rates_eur
  GROUP BY currency
)
SELECT
  c.region,
  c.country,
  c.currency,
  a.avg_rate_vs_eur
FROM countries_currency c
JOIN avg_fx a
  ON c.currency = a.currency
WHERE c.currency IS NOT NULL
ORDER BY c.region, c.country;


region,country,currency,avg_rate_vs_eur
Africa,British Indian Ocean Territory,USD,1.0824684824902724
Africa,Eswatini,ZAR,19.831756031128407
Africa,Lesotho,ZAR,19.831756031128407
Africa,Namibia,ZAR,19.831756031128407
Africa,"Saint Helena, Ascension and Tristan da Cunha",GBP,0.8467038910505836
Africa,South Africa,ZAR,19.831756031128407
Americas,Bahamas,USD,1.0824684824902724
Americas,Brazil,BRL,5.826467704280155
Americas,British Virgin Islands,USD,1.0824684824902724
Americas,Canada,CAD,1.482040856031129


In [0]:

# 4.5 Notebook Analysis
# Ensure using the correct schema
spark.sql("USE SCHEMA default")

# Load Delta tables created in previous steps
df_fx = spark.table("fx_rates_eur")
df_cty = spark.table("countries_currency")

# Inspect schemas and a few sample rows
print("=== Schemas ===")
df_fx.printSchema()      # FX: date (string), base, currency, rate (double)
df_cty.printSchema()     # Countries: country, cca2, region, population (long), currency, currency_name

print("=== Samples ===")
df_fx.show(5)
df_cty.show(5)


=== Schemas ===
root
 |-- date: string (nullable = true)
 |-- base: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- rate: double (nullable = true)

root
 |-- country: string (nullable = true)
 |-- cca2: string (nullable = true)
 |-- region: string (nullable = true)
 |-- population: long (nullable = true)
 |-- currency: string (nullable = true)
 |-- currency_name: string (nullable = true)

=== Samples ===
+----------+----+--------+-------+
|      date|base|currency|   rate|
+----------+----+--------+-------+
|2024-04-02| EUR|     DKK| 7.4582|
|2024-04-02| EUR|     GBP| 0.8551|
|2024-04-02| EUR|     HKD| 8.4148|
|2024-04-02| EUR|     HUF| 395.63|
|2024-04-02| EUR|     IDR|17103.0|
+----------+----+--------+-------+
only showing top 5 rows
+-------------------+----+--------+----------+--------+--------------------+
|            country|cca2|  region|population|currency|       currency_name|
+-------------------+----+--------+----------+--------+--------------------+


In [0]:

# 4.5 Notebook Analysis / Descriptive Stats
# Descriptive statistics for FX rates (numeric column)
print("=== FX rates descriptive stats ===")
df_fx.describe(["rate"]).show()

# Descriptive statistics for population (numeric column)
print("=== Countries population descriptive stats ===")
df_cty.describe(["population"]).show()


=== FX rates descriptive stats ===
+-------+-----------------+
|summary|             rate|
+-------+-----------------+
|  count|             7710|
|   mean| 657.136475616083|
| stddev|3076.468849578516|
|    min|          0.82428|
|    max|          17763.0|
+-------+-----------------+

=== Countries population descriptive stats ===
+-------+--------------------+
|summary|          population|
+-------+--------------------+
|  count|                 271|
|   mean|2.9784230025830258E7|
| stddev|1.2698282940775831E8|
|    min|                   0|
|    max|          1417492000|
+-------+--------------------+



In [0]:

#4.5 Notebook Analysis / GroupBy + Aggregation

from pyspark.sql.functions import avg, sum, col

# 1) Average FX rate per currency (descending)
print("=== Average FX rate per currency ===")
df_fx.groupBy("currency").agg(avg("rate").alias("avg_rate")) \
     .orderBy(col("avg_rate").desc()) \
     .show(20, truncate=False)

# 2) Total population per region (descending)
print("=== Total population per region ===")
df_cty.groupBy("region").agg(sum("population").alias("total_population")) \
     .orderBy(col("total_population").desc()) \
     .show(truncate=False)


=== Average FX rate per currency ===
+--------+------------------+
|currency|avg_rate          |
+--------+------------------+
|IDR     |17157.365758754862|
|KRW     |1475.241673151751 |
|HUF     |395.2552140077822 |
|JPY     |163.82264591439687|
|ISK     |149.3147859922179 |
|INR     |90.56163424124512 |
|PHP     |62.00433463035018 |
|THB     |38.18027237354086 |
|TRY     |35.56200389105059 |
|CZK     |25.118256809338522|
|ZAR     |19.831756031128407|
|MXN     |19.827089494163424|
|NOK     |11.62751634241245 |
|SEK     |11.431209727626458|
|HKD     |8.446087159533073 |
|CNY     |7.787716731517508 |
|DKK     |7.458890272373541 |
|BRL     |5.826467704280155 |
|RON     |4.974638521400779 |
|MYR     |4.950764980544747 |
+--------+------------------+
only showing top 20 rows
=== Total population per region ===
+---------+----------------+
|region   |total_population|
+---------+----------------+
|Asia     |4754516169      |
|Africa   |1470046247      |
|Americas |1056790735      |
|Europe 

In [0]:
#4.5 Notebook Analysis / JOIN

from pyspark.sql.functions import avg, col

# Join countries and FX by currency; keep only rows with currency info on both sides
df_join = df_cty.join(df_fx, on="currency", how="inner")

# Compute average annual FX rate vs EUR per country
df_country_avg_fx = (
    df_join.groupBy("country", "region", "currency")
           .agg(avg("rate").alias("avg_rate_vs_eur"))
           .orderBy(col("avg_rate_vs_eur").desc())
)

print("=== Country-level average FX vs EUR (2024) ===")
df_country_avg_fx.show(20, truncate=False)


=== Country-level average FX vs EUR (2024) ===
+----------------------+--------+--------+------------------+
|country               |region  |currency|avg_rate_vs_eur   |
+----------------------+--------+--------+------------------+
|Indonesia             |Asia    |IDR     |17157.365758754862|
|South Korea           |Asia    |KRW     |1475.241673151751 |
|Hungary               |Europe  |HUF     |395.2552140077822 |
|Japan                 |Asia    |JPY     |163.82264591439687|
|Iceland               |Europe  |ISK     |149.3147859922179 |
|Bhutan                |Asia    |INR     |90.56163424124512 |
|India                 |Asia    |INR     |90.56163424124512 |
|Philippines           |Asia    |PHP     |62.00433463035018 |
|Thailand              |Asia    |THB     |38.18027237354086 |
|Turkey                |Asia    |TRY     |35.56200389105059 |
|Czechia               |Europe  |CZK     |25.118256809338522|
|Eswatini              |Africa  |ZAR     |19.831756031128407|
|South Africa          

In [0]:
#4.5 Notebook Analysis / Time Series Processing: Convert Date to DATE and Calculate Monthly Average Exchange Rate

from pyspark.sql.functions import to_date, date_format

# Cast date string to DATE type, then extract 'YYYY-MM' as month
df_fx_typed = (
    df_fx
    .withColumn("date_dt", to_date(col("date")))          # convert string to date
    .withColumn("month", date_format(col("date_dt"), "yyyy-MM"))
)

# Monthly average for USD
df_fx_usd_monthly = (
    df_fx_typed.filter(col("currency") == "USD")
               .groupBy("month")
               .agg(avg("rate").alias("avg_rate_usd"))
               .orderBy("month")
)

print("=== Monthly average EUR→USD (2024) ===")
df_fx_usd_monthly.show(12, truncate=False)


=== Monthly average EUR→USD (2024) ===
+-------+------------------+
|month  |avg_rate_usd      |
+-------+------------------+
|2023-12|1.105             |
|2024-01|1.0905136363636365|
|2024-02|1.0794714285714286|
|2024-03|1.0872199999999999|
|2024-04|1.0727761904761905|
|2024-05|1.081222727272727 |
|2024-06|1.0759            |
|2024-07|1.084408695652174 |
|2024-08|1.1012181818181819|
|2024-09|1.1105999999999998|
|2024-10|1.0904347826086958|
|2024-11|1.063014285714286 |
+-------+------------------+
only showing top 12 rows


In [0]:
#4.5 Notebook Analysis / Notebook Visualization

# Visualize monthly EUR→USD averages (select line chart in the UI)
display(df_fx_usd_monthly)

# Visualize total population per region (select bar chart in the UI)
from pyspark.sql.functions import sum
df_region_pop = df_cty.groupBy("region").agg(sum("population").alias("total_population")) \
                      .orderBy(col("total_population").desc())
display(df_region_pop)


month,avg_rate_usd
2023-12,1.105
2024-01,1.0905136363636363
2024-02,1.0794714285714286
2024-03,1.08722
2024-04,1.0727761904761903
2024-05,1.081222727272727
2024-06,1.0759
2024-07,1.084408695652174
2024-08,1.101218181818182
2024-09,1.1105999999999998


region,total_population
Asia,4754516169
Africa,1470046247
Americas,1056790735
Europe,741965385
Oceania,48206101
Antarctic,1700


In [0]:
#4.5 Notebook Analysis / Save Analysis Results as a Delta Table

# Persist summary tables for dashboard consumption
df_fx_usd_monthly.write.mode("overwrite").format("delta").saveAsTable("fx_usd_monthly_avg")
df_region_pop.write.mode("overwrite").format("delta").saveAsTable("region_population_total")

print("Saved summary tables: fx_usd_monthly_avg, region_population_total")


Saved summary tables: fx_usd_monthly_avg, region_population_total


In [0]:

# #4.5 Notebook Analysis / Parameterize Monthly Average Exchange Rate (Dynamic Currency)

# Create a dropdown widget for currency selection
dbutils.widgets.removeAll()
dbutils.widgets.text("p_currency", "USD", "Currency (e.g., USD, JPY, GBP)")

p_currency = dbutils.widgets.get("p_currency")

from pyspark.sql.functions import to_date, date_format, avg, col

df_fx_typed = df_fx.withColumn("date_dt", to_date(col("date"))) \
                   .withColumn("month", date_format(col("date_dt"), "yyyy-MM"))

df_fx_monthly_param = (
    df_fx_typed.filter(col("currency") == p_currency)
               .groupBy("month")
               .agg(avg("rate").alias("avg_rate"))
               .orderBy("month")
)

print(f"=== Monthly average EUR→{p_currency} (2024) ===")
display(df_fx_monthly_param)


=== Monthly average EUR→USD (2024) ===


month,avg_rate
2023-12,1.105
2024-01,1.0905136363636363
2024-02,1.0794714285714286
2024-03,1.08722
2024-04,1.0727761904761903
2024-05,1.081222727272727
2024-06,1.0759
2024-07,1.084408695652174
2024-08,1.101218181818182
2024-09,1.1105999999999998
