This notebook builds a dimensional Data Lakehouse that integrates country level data from multiple sources to support historical analysis and aggregation. Data is extracted from a relational MySQL database,  MongoDB, and CSV files, then cleaned, standardized, and combined using  Spark. The project creates a couple dimension tables including country, city, language, date, and income group, along with a fact table that stores country indicators over time such as internet usage, health index, mobile subscriptions, and physician density. These tables are designed to support analytical queries that summarize trends by year, continent, and income group. The final output of the notebook is a  populated lakehouse schema and example queries that show how the data can be used for post hoc analysis


In [41]:
import findspark
findspark.init()

import os
import json
import builtins
import pymongo
import certifi

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_date, date_format, year, quarter, month, dayofmonth, weekofyear,
    lower, trim, regexp_replace, lit
)

mysql_args = {
    "host_name": "localhost",
    "port": "3306",
    "db_name": "world",
    "conn_props": {
        "user": "root",
        "password": "Aryal123@",##fake password
        "driver": "com.mysql.cj.jdbc.Driver"
    }
}

mongodb_args = {
    "cluster_location": "atlas",
    "user_name": "snehaaryal58_db_user",
    "password": "v30J4C6N3WXTHABr",
    "cluster_name": "worldcluster",
    "cluster_subnet": "hdrff8i",
    "db_name": "world_side",
    "collection": "",
    "null_column_threshold": 0.5
}

In [42]:
def get_mongo_uri(args):
    return f"mongodb+srv://{args['user_name']}:{args['password']}@{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"

threads = builtins.max(1, int(os.cpu_count() / 2))

sparkConf = (
    SparkConf()
    .setAppName("World Lakehouse Local")
    .setMaster(f"local[{threads}]")
    .set("spark.jars.packages", "mysql:mysql-connector-java:8.0.33,org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")
    .set("spark.mongodb.input.uri", get_mongo_uri(mongodb_args))
    .set("spark.mongodb.output.uri", get_mongo_uri(mongodb_args))
    .set("spark.sql.shuffle.partitions", str(threads))
    .set("spark.sql.warehouse.dir", os.path.abspath("spark-warehouse"))
)

spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

spark.sql("DROP DATABASE IF EXISTS world_dlh CASCADE")
spark.sql("CREATE DATABASE world_dlh")
spark.sql("USE world_dlh")

def get_mysql_df(sql_query: str):
    jdbc_url = f"jdbc:mysql://{mysql_args['host_name']}:{mysql_args['port']}/{mysql_args['db_name']}"
    return (
        spark.read.format("jdbc")
        .option("url", jdbc_url)
        .option("driver", mysql_args["conn_props"]["driver"])
        .option("user", mysql_args["conn_props"]["user"])
        .option("password", mysql_args["conn_props"]["password"])
        .option("query", sql_query)
        .load()
    )

spark.sql("SHOW DATABASES").show(200, truncate=False)
spark.sql("SHOW TABLES").show(200, truncate=False)
df_test = get_mysql_df("SELECT Code, Name, Continent FROM country LIMIT 5")
df_test.show(5, truncate=False)

+---------+
|namespace|
+---------+
|default  |
|world_dlh|
+---------+

+---------+------------+-----------+
|namespace|tableName   |isTemporary|
+---------+------------+-----------+
|         |stg_city    |true       |
|         |stg_country |true       |
|         |stg_income  |true       |
|         |stg_language|true       |
+---------+------------+-----------+

+----+----------------------------------------------------+-------------+
|Code|Name                                                |Continent    |
+----+----------------------------------------------------+-------------+
|ABW |Aruba                                               |North America|
|AFG |Afghanistan                                         |Asia         |
|AGO |Angola                                              |Africa       |
|AIA |Anguilla                                            |North America|
|ALB |Albania                                             |Europe       |
+----+--------------------------------

The dim_country table is built from the MySQL country table

In [43]:
df_country = get_mysql_df("""
SELECT
  Code AS country_code,
  Name AS country_name,
  Continent AS continent,
  Region AS region,
  SurfaceArea AS surface_area,
  IndepYear AS indep_year,
  Population AS population,
  LifeExpectancy AS life_expectancy,
  GNP AS gnp,
  GNPOld AS gnp_old,
  LocalName AS local_name,
  GovernmentForm AS government_form,
  HeadOfState AS head_of_state,
  Capital AS capital_city_id,
  Code2 AS country_code2
FROM country
""")

df_country = df_country.withColumn("region_norm", lower(trim(regexp_replace(col("region"), r"\s+", " "))))
df_country.createOrReplaceTempView("stg_country")

df_dim_country = spark.sql("""
SELECT
  ROW_NUMBER() OVER (ORDER BY country_code) AS country_key,
  country_code,
  country_name,
  continent,
  region,
  region_norm,
  surface_area,
  indep_year,
  population,
  life_expectancy,
  gnp,
  gnp_old,
  local_name,
  government_form,
  head_of_state,
  capital_city_id,
  country_code2
FROM stg_country
""")

df_dim_country.write.mode("overwrite").format("parquet").saveAsTable("dim_country")

spark.sql("SHOW TABLES").show(200, truncate=False)
spark.sql("SELECT * FROM dim_country ORDER BY country_key LIMIT 10").show(10, truncate=False)


+---------+------------+-----------+
|namespace|tableName   |isTemporary|
+---------+------------+-----------+
|world_dlh|dim_country |false      |
|         |stg_city    |true       |
|         |stg_country |true       |
|         |stg_income  |true       |
|         |stg_language|true       |
+---------+------------+-----------+

+-----------+------------+----------------------------------------------------+-------------+--------------------------+-------------------------+------------+----------+----------+---------------+---------+---------+---------------------------------------------+---------------------------------------------+------------------------------------------------------------+---------------+-------------+
|country_key|country_code|country_name                                        |continent    |region                    |region_norm              |surface_area|indep_year|population|life_expectancy|gnp      |gnp_old  |local_name                                   |go

The dim_city table is sourced from the MySQL city table

In [44]:
df_city = get_mysql_df("""
SELECT
  ID AS city_id,
  Name AS city_name,
  CountryCode AS country_code,
  District AS district,
  Population AS city_population
FROM city
""")

df_city.createOrReplaceTempView("stg_city")

df_dim_city = spark.sql("""
SELECT
  ROW_NUMBER() OVER (ORDER BY city_id) AS city_key,
  city_id,
  city_name,
  country_code,
  district,
  city_population
FROM stg_city
""")

df_dim_city.write.mode("overwrite").format("parquet").saveAsTable("dim_city")

spark.sql("SHOW TABLES").show(200, truncate=False)
spark.sql("SELECT * FROM dim_city ORDER BY city_key LIMIT 10").show(10, truncate=False)


+---------+------------+-----------+
|namespace|tableName   |isTemporary|
+---------+------------+-----------+
|world_dlh|dim_city    |false      |
|world_dlh|dim_country |false      |
|         |stg_city    |true       |
|         |stg_country |true       |
|         |stg_income  |true       |
|         |stg_language|true       |
+---------+------------+-----------+

+--------+-------+-----------------------------------+------------+--------------------+---------------+
|city_key|city_id|city_name                          |country_code|district            |city_population|
+--------+-------+-----------------------------------+------------+--------------------+---------------+
|1       |1      |Kabul                              |AFG         |Kabol               |1780000        |
|2       |2      |Qandahar                           |AFG         |Qandahar            |237500         |
|3       |3      |Herat                              |AFG         |Herat               |186800         |

The dim_language table is from the countrylanguage table

In [45]:
df_lang = get_mysql_df("""
SELECT
  CountryCode AS country_code,
  Language AS language,
  IsOfficial AS is_official,
  Percentage AS language_pct
FROM countrylanguage
""")

df_lang = (
    df_lang
    .withColumn("is_official", col("is_official").cast("string"))
    .withColumn("language_pct", col("language_pct").cast("double"))
    .dropDuplicates(["country_code", "language"])
)

df_lang.createOrReplaceTempView("stg_language")

df_dim_language = spark.sql("""
SELECT
  ROW_NUMBER() OVER (ORDER BY country_code, language) AS language_key,
  country_code,
  language,
  is_official,
  language_pct
FROM stg_language
""")

df_dim_language.write.mode("overwrite").format("parquet").saveAsTable("dim_language")

spark.sql("SHOW TABLES").show(200, truncate=False)
spark.sql("SELECT * FROM dim_language ORDER BY language_key LIMIT 10").show(10, truncate=False)


+---------+------------+-----------+
|namespace|tableName   |isTemporary|
+---------+------------+-----------+
|world_dlh|dim_city    |false      |
|world_dlh|dim_country |false      |
|world_dlh|dim_language|false      |
|         |stg_city    |true       |
|         |stg_country |true       |
|         |stg_income  |true       |
|         |stg_language|true       |
+---------+------------+-----------+

+------------+------------+------------------------------+-----------+------------+
|language_key|country_code|language                      |is_official|language_pct|
+------------+------------+------------------------------+-----------+------------+
|1           |ABW         |Dutch                         |T          |5.3         |
|2           |ABW         |English                       |F          |9.5         |
|3           |ABW         |Papiamento                    |F          |76.7        |
|4           |ABW         |Spanish                       |F          |7.4         |
|5  

The date dimension is generated using a continuous date sequence from 1990 to 2026 

In [46]:
start_date = "1990-01-01"
end_date = "2026-12-31"

df_dim_date = (
    spark.sql(f"SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) AS full_date")
    .withColumn("date_key", date_format(col("full_date"), "yyyyMMdd").cast("int"))
    .withColumn("year", year(col("full_date")))
    .withColumn("quarter", quarter(col("full_date")))
    .withColumn("month", month(col("full_date")))
    .withColumn("month_name", date_format(col("full_date"), "MMMM"))
    .withColumn("day", dayofmonth(col("full_date")))
    .withColumn("day_of_week", date_format(col("full_date"), "E"))
    .withColumn("week_of_year", weekofyear(col("full_date")))
)

df_dim_date.write.mode("overwrite").format("parquet").saveAsTable("dim_date")

spark.sql("SHOW TABLES").show(200, truncate=False)
spark.sql("SELECT * FROM dim_date ORDER BY full_date LIMIT 10").show(10, truncate=False)


+---------+------------+-----------+
|namespace|tableName   |isTemporary|
+---------+------------+-----------+
|world_dlh|dim_city    |false      |
|world_dlh|dim_country |false      |
|world_dlh|dim_date    |false      |
|world_dlh|dim_language|false      |
|         |stg_city    |true       |
|         |stg_country |true       |
|         |stg_income  |true       |
|         |stg_language|true       |
+---------+------------+-----------+

+----------+--------+----+-------+-----+----------+---+-----------+------------+
|full_date |date_key|year|quarter|month|month_name|day|day_of_week|week_of_year|
+----------+--------+----+-------+-----+----------+---+-----------+------------+
|1990-01-01|19900101|1990|1      |1    |January   |1  |Mon        |1           |
|1990-01-02|19900102|1990|1      |1    |January   |2  |Tue        |1           |
|1990-01-03|19900103|1990|1      |1    |January   |3  |Wed        |1           |
|1990-01-04|19900104|1990|1      |1    |January   |4  |Thu        |1 

The fact table captures country level indicators over time, including: Internet usage, Health index, Mobile subscriptions, Physician density
Each record is linked to:
country_key from dim_country
date_key from dim_date

In [47]:
def mongo_client():
    uri = f"mongodb+srv://{mongodb_args['user_name']}:{mongodb_args['password']}@{mongodb_args['cluster_name']}.{mongodb_args['cluster_subnet']}.mongodb.net/?retryWrites=true&w=majority"
    return pymongo.MongoClient(uri, tlsCAFile=certifi.where())

json_path = os.path.abspath("data/country_indicators.json")
with open(json_path, "r") as f:
    indicators = json.load(f)

client = mongo_client()
db = client[mongodb_args["db_name"]]
db.drop_collection("country_indicators")
db["country_indicators"].insert_many(indicators if isinstance(indicators, list) else [indicators])
client.close()

mongodb_args["collection"] = "country_indicators"

df_indicators = (
    spark.read.format("com.mongodb.spark.sql.DefaultSource")
    .option("database", mongodb_args["db_name"])
    .option("collection", mongodb_args["collection"])
    .load()
    .drop("_id")
)

cols = set(df_indicators.columns)
if "Code" in cols:
    df_indicators2 = df_indicators.withColumnRenamed("Code", "country_code")
elif "iso3" in cols:
    df_indicators2 = df_indicators.withColumnRenamed("iso3", "country_code")
elif "country" in cols:
    df_indicators2 = df_indicators.withColumnRenamed("country", "country_code")
else:
    df_indicators2 = df_indicators

df_fact_stage = (
    df_indicators2
    .withColumn("as_of", to_date(col("as_of")))
    .withColumn("date_key", date_format(col("as_of"), "yyyyMMdd").cast("int"))
    .withColumn("internet_pct", col("internet_pct").cast("double"))
    .withColumn("health_index", col("health_index").cast("double"))
    .withColumn("mobile_subs_per_100", col("mobile_subs_per_100").cast("double"))
    .withColumn("physicians_per_1000", col("physicians_per_1000").cast("double"))
    .dropDuplicates(["country_code", "date_key"])
)

df_country_keys = spark.table("dim_country").select("country_key", "country_code")

df_fact_indicators = (
    df_fact_stage
    .join(df_country_keys, "country_code", "left")
    .select(
        col("date_key"),
        col("country_key"),
        col("country_code"),
        col("internet_pct"),
        col("health_index"),
        col("mobile_subs_per_100"),
        col("physicians_per_1000")
    )
)

df_fact_indicators.write.mode("overwrite").format("parquet").saveAsTable("fact_country_indicators")

spark.sql("SHOW TABLES").show(200, truncate=False)
spark.sql("SELECT * FROM fact_country_indicators LIMIT 10").show(10, truncate=False)

spark.sql("""
SELECT
  MIN(d.year) AS min_year,
  MAX(d.year) AS max_year,
  COUNT(*) AS row_count
FROM fact_country_indicators f
JOIN dim_date d ON f.date_key = d.date_key
""").show(50, truncate=False)


+---------+-----------------------+-----------+
|namespace|tableName              |isTemporary|
+---------+-----------------------+-----------+
|world_dlh|dim_city               |false      |
|world_dlh|dim_country            |false      |
|world_dlh|dim_date               |false      |
|world_dlh|dim_language           |false      |
|world_dlh|fact_country_indicators|false      |
|         |stg_city               |true       |
|         |stg_country            |true       |
|         |stg_income             |true       |
|         |stg_language           |true       |
+---------+-----------------------+-----------+

+--------+-----------+------------+------------+------------+-------------------+-------------------+
|date_key|country_key|country_code|internet_pct|health_index|mobile_subs_per_100|physicians_per_1000|
+--------+-----------+------------+------------+------------+-------------------+-------------------+
|20000101|100        |IND         |0.5         |0.61        |3.0     

This section loads income group data from a CSV file and builds an income dimension table

In [48]:
csv_path = os.path.abspath("data/country_income.csv")

df_income_raw = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(csv_path)
)

df_income_raw = df_income_raw.toDF(*[c.strip().lower().replace(" ", "_") for c in df_income_raw.columns])

if "incomegroup" in df_income_raw.columns and "income_group" not in df_income_raw.columns:
    df_income_raw = df_income_raw.withColumnRenamed("incomegroup", "income_group")

df_income2 = (
    df_income_raw
    .select("region", "income_group")
    .withColumn("region_norm", lower(trim(regexp_replace(col("region"), r"\s+", " "))))
    .dropna(subset=["region", "income_group"])
    .dropDuplicates(["region_norm", "income_group"])
)

df_income2.createOrReplaceTempView("stg_income")

df_dim_income = spark.sql("""
SELECT
  ROW_NUMBER() OVER (ORDER BY region_norm, income_group) AS income_key,
  region,
  region_norm,
  income_group
FROM stg_income
""")

df_dim_income.write.mode("overwrite").format("parquet").saveAsTable("dim_income")

spark.sql("SHOW TABLES").show(200, truncate=False)
spark.sql("SELECT * FROM dim_income ORDER BY income_key LIMIT 50").show(50, truncate=False)


+---------+-----------------------+-----------+
|namespace|tableName              |isTemporary|
+---------+-----------------------+-----------+
|world_dlh|dim_city               |false      |
|world_dlh|dim_country            |false      |
|world_dlh|dim_date               |false      |
|world_dlh|dim_income             |false      |
|world_dlh|dim_language           |false      |
|world_dlh|fact_country_indicators|false      |
|         |stg_city               |true       |
|         |stg_country            |true       |
|         |stg_income             |true       |
|         |stg_language           |true       |
+---------+-----------------------+-----------+

+----------+-------------------------+-------------------------+------------+
|income_key|region                   |region_norm              |income_group|
+----------+-------------------------+-------------------------+------------+
|1         |Antarctica               |antarctica               |High        |
|2         |Aus

These queries show how the data can be used to find average internet usage over time by continent and by income group. The table list at the end confirms that all tables were created correctly

In [49]:
spark.sql("""
SELECT
  d.year,
  c.continent,
  ROUND(AVG(f.internet_pct), 2) AS avg_internet_pct
FROM fact_country_indicators f
JOIN dim_date d ON f.date_key = d.date_key
JOIN dim_country c ON f.country_key = c.country_key
GROUP BY d.year, c.continent
ORDER BY d.year, c.continent
""").show(200, truncate=False)

spark.sql("""
SELECT
  d.year,
  i.income_group,
  ROUND(AVG(f.internet_pct), 2) AS avg_internet_pct
FROM fact_country_indicators f
JOIN dim_date d ON f.date_key = d.date_key
JOIN dim_country c ON f.country_key = c.country_key
JOIN dim_income i ON c.region_norm = i.region_norm
GROUP BY d.year, i.income_group
ORDER BY d.year, i.income_group
""").show(200, truncate=False)

spark.sql("SHOW TABLES").show(200, truncate=False)

+----+-------------+----------------+
|year|continent    |avg_internet_pct|
+----+-------------+----------------+
|2000|Africa       |5.4             |
|2000|Asia         |8.4             |
|2000|Europe       |15.35           |
|2000|North America|33.17           |
|2000|South America|5.3             |
+----+-------------+----------------+

+----+------------+----------------+
|year|income_group|avg_internet_pct|
+----+------------+----------------+
|2000|High        |31.75           |
|2000|Lower middle|0.5             |
|2000|Upper middle|7.08            |
+----+------------+----------------+

+---------+-----------------------+-----------+
|namespace|tableName              |isTemporary|
+---------+-----------------------+-----------+
|world_dlh|dim_city               |false      |
|world_dlh|dim_country            |false      |
|world_dlh|dim_date               |false      |
|world_dlh|dim_income             |false      |
|world_dlh|dim_language           |false      |
|world_dlh|fa