In [1]:
from pyspark.sql.functions import (
    col, trim, when, lit, round as spark_round,
    current_timestamp, year
)

# ---- Load Bronze tables ----
urbanisation = spark.table("bronze_urbanisation")
wid = spark.table("bronze_wid_income")
country_dim = spark.table("country_dimension")

print("Bronze tables loaded successfully")
print("Urbanisation rows:", urbanisation.count())
print("WID rows:", wid.count())
print("Country dimension rows:", country_dim.count())

StatementMeta(, 7fdc0592-a200-4739-a43c-57308abcb9b9, 3, Finished, Available, Finished, False)

Bronze tables loaded successfully
Urbanisation rows: 27573
WID rows: 7078
Country dimension rows: 233


In [2]:
# ---- Clean and filter urbanisation data ----

urbanisation_clean = urbanisation \
    .withColumn("urban_population", col("urban_population").cast("long")) \
    .withColumn("rural_population", col("rural_population").cast("long")) \
    .withColumn("year", col("year").cast("integer")) \
    .withColumn("total_population", col("urban_population") + col("rural_population")) \
    .withColumn("urban_pct", 
        spark_round((col("urban_population") / col("total_population")) * 100, 2)) \
    .withColumn("rural_pct",
        spark_round((col("rural_population") / col("total_population")) * 100, 2)) \
    .withColumn("data_type",
        when(col("year") <= 2018, "historical").otherwise("projected")) \
    .filter(col("total_population") > 0) \
    .filter(col("year").isNotNull()) \
    .filter(~col("country_name").isin([
        "World", "Africa", "Europe", "Asia", "Oceania",
        "Less developed regions", "More developed regions",
        "High-income countries", "Low-income countries",
        "Latin America and the Caribbean", "Northern America",
        "Sub-Saharan Africa", "Eastern Africa", "

StatementMeta(, 7fdc0592-a200-4739-a43c-57308abcb9b9, 4, Finished, Available, Finished, False)

SyntaxError: unterminated string literal (detected at line 21) (2754737927.py, line 21)

In [3]:
# Define regions to exclude
regions_to_exclude = [
    "World", "Africa", "Europe", "Asia", "Oceania",
    "Less developed regions", "More developed regions",
    "High-income countries", "Low-income countries",
    "Northern America", "Sub-Saharan Africa",
    "Eastern Africa", "Western Africa", "Southern Africa",
    "Middle Africa", "Northern Africa", "Eastern Europe",
    "Southern Europe", "Northern Europe", "Western Europe",
    "Eastern Asia", "Southern Asia", "South-Eastern Asia",
    "Central Asia", "South-Central Asia", "Western Asia",
    "Caribbean", "Central America", "South America",
    "Melanesia", "Polynesia", "Least developed countries",
    "Middle-income countries", "Upper-middle-income countries",
    "Lower-middle-income countries", "Latin America and the Caribbean",
    "Australia/New Zealand", "Micronesia (subregion)"
]

print("Regions defined:", len(regions_to_exclude))

StatementMeta(, 7fdc0592-a200-4739-a43c-57308abcb9b9, 5, Finished, Available, Finished, False)

Regions defined: 38


In [4]:
# Clean and filter urbanisation data
urbanisation_clean = urbanisation \
    .withColumn("urban_population", col("urban_population").cast("long")) \
    .withColumn("rural_population", col("rural_population").cast("long")) \
    .withColumn("year", col("year").cast("integer")) \
    .withColumn("total_population", 
        col("urban_population") + col("rural_population")) \
    .withColumn("urban_pct",
        spark_round((col("urban_population") / col("total_population")) * 100, 2)) \
    .withColumn("rural_pct",
        spark_round((col("rural_population") / col("total_population")) * 100, 2)) \
    .withColumn("data_type",
        when(col("year") <= 2018, "historical").otherwise("projected")) \
    .filter(col("total_population") > 0) \
    .filter(col("year").isNotNull()) \
    .filter(~col("country_name").isin(regions_to_exclude))

print("Urbanisation clean rows:", urbanisation_clean.count())
urbanisation_clean.show(3, truncate=False)

StatementMeta(, 7fdc0592-a200-4739-a43c-57308abcb9b9, 6, Finished, Available, Finished, False)

Urbanisation clean rows: 23735
+------------+----+----------------+----------------+--------------------------+------------------------------------+----------------+---------+---------+----------+
|country_name|year|urban_population|rural_population|_ingestion_timestamp      |_source                             |total_population|urban_pct|rural_pct|data_type |
+------------+----+----------------+----------------+--------------------------+------------------------------------+----------------+---------+---------+----------+
|Afghanistan |1950|465000          |7287000         |2026-02-27 20:38:20.428209|UN_World_Urbanization_Prospects_2018|7752000         |6.0      |94.0     |historical|
|Afghanistan |1951|487000          |7353000         |2026-02-27 20:38:20.428209|UN_World_Urbanization_Prospects_2018|7840000         |6.21     |93.79    |historical|
|Afghanistan |1952|510000          |7425000         |2026-02-27 20:38:20.428209|UN_World_Urbanization_Prospects_2018|7935000         |6.43 

In [5]:
# Clean WID income data
wid_clean = wid \
    .withColumn("year", col("year").cast("integer")) \
    .withColumn("mean_income", col("mean_income").cast("double")) \
    .withColumn("gini_coefficient", col("gini_coefficient").cast("double")) \
    .withColumn("median_income", col("median_income").cast("double")) \
    .withColumn("top1_income_share", col("top1_income_share").cast("double")) \
    .withColumn("top10_income_share", col("top10_income_share").cast("double")) \
    .withColumn("bottom50_income_share", col("bottom50_income_share").cast("double")) \
    .filter(col("mean_income").isNotNull()) \
    .filter(col("year").isNotNull()) \
    .select(
        "country_name",
        "year",
        "mean_income",
        "median_income",
        "gini_coefficient",
        "top1_income_share",
        "top10_income_share",
        "bottom50_income_share"
    )

print("WID clean rows:", wid_clean.count())
wid_clean.show(3, truncate=False)

StatementMeta(, 7fdc0592-a200-4739-a43c-57308abcb9b9, 7, Finished, Available, Finished, False)

WID clean rows: 5845
+------------+----+-----------------+-----------------+-----------------+------------------+------------------+---------------------+
|country_name|year|mean_income      |median_income    |gini_coefficient |top1_income_share |top10_income_share|bottom50_income_share|
+------------+----+-----------------+-----------------+-----------------+------------------+------------------+---------------------+
|Afghanistan |2008|3553.318948336138|2189.86421366986 |0.524523877237829|16.220000000000002|41.94             |17.16                |
|Afghanistan |2012|4935.029169445308|3188.565573089848|0.492655795156669|13.819999999999999|38.76             |19.0                 |
|Afghanistan |2017|4635.860131021041|2782.66862800768 |0.515393129277367|15.079999999999998|41.010000000000005|17.79                |
+------------+----+-----------------+-----------------+-----------------+------------------+------------------+---------------------+
only showing top 3 rows



In [6]:
# ---- Join everything together into Silver table ----

silver = urbanisation_clean \
    .join(country_dim, on="country_name", how="left") \
    .join(wid_clean, on=["country_name", "year"], how="left") \
    .select(
        # Geography
        col("country_name"),
        col("continent"),
        col("who_region"),
        
        # Time
        col("year"),
        col("data_type"),
        
        # Population
        col("urban_population"),
        col("rural_population"),
        col("total_population"),
        col("urban_pct"),
        col("rural_pct"),
        
        # Income
        col("mean_income"),
        col("median_income"),
        col("gini_coefficient"),
        col("top1_income_share"),
        col("top10_income_share"),
        col("bottom50_income_share")
    )

print("Silver rows:", silver.count())
silver.show(3, truncate=False)


StatementMeta(, 7fdc0592-a200-4739-a43c-57308abcb9b9, 8, Finished, Available, Finished, False)

Silver rows: 23735
+------------+---------+---------------------+----+----------+----------------+----------------+----------------+---------+---------+-----------+-------------+----------------+-----------------+------------------+---------------------+
|country_name|continent|who_region           |year|data_type |urban_population|rural_population|total_population|urban_pct|rural_pct|mean_income|median_income|gini_coefficient|top1_income_share|top10_income_share|bottom50_income_share|
+------------+---------+---------------------+----+----------+----------------+----------------+----------------+---------+---------+-----------+-------------+----------------+-----------------+------------------+---------------------+
|Afghanistan |Asia     |Eastern Mediterranean|1950|historical|465000          |7287000         |7752000         |6.0      |94.0     |NULL       |NULL         |NULL            |NULL             |NULL              |NULL                 |
|Afghanistan |Asia     |Eastern Medit

In [7]:
# ---- Silver quality check ----
total = silver.count()
has_continent = silver.filter(col("continent").isNotNull()).count()
has_who = silver.filter(col("who_region").isNotNull()).count()
has_income = silver.filter(col("mean_income").isNotNull()).count()
has_gini = silver.filter(col("gini_coefficient").isNotNull()).count()
historical = silver.filter(col("data_type") == "historical").count()
projected = silver.filter(col("data_type") == "projected").count()

print("="*50)
print("SILVER LAYER QUALITY REPORT")
print("="*50)
print(f"Total rows:           {total:,}")
print(f"With continent:       {has_continent:,}")
print(f"With WHO region:      {has_who:,}")
print(f"With mean income:     {has_income:,}")
print(f"With gini:            {has_gini:,}")
print(f"Historical rows:      {historical:,}")
print(f"Projected rows:       {projected:,}")
print(f"Countries:            {silver.select('country_name').distinct().count()}")
print(f"Year range:           {silver.agg({'year':'min'}).collect()[0][0]} - {silver.agg({'year':'max'}).collect()[0][0]}")

StatementMeta(, 7fdc0592-a200-4739-a43c-57308abcb9b9, 9, Finished, Available, Finished, False)

SILVER LAYER QUALITY REPORT
Total rows:           23,735
With continent:       23,230
With WHO region:      19,392
With mean income:     4,158
With gini:            4,158
Historical rows:      16,215
Projected rows:       7,520
Countries:            235
Year range:           1950 - 2050


In [8]:
# ---- Save Silver table ----
silver.write.format("delta").mode("overwrite") \
    .saveAsTable("silver_urbanisation")

print("Silver table saved successfully")

StatementMeta(, 7fdc0592-a200-4739-a43c-57308abcb9b9, 10, Finished, Available, Finished, False)

Silver table saved successfully
