# **Transform the extracted data to make it ready for visualization**

## Step 1: Load from JSON to pySpark dataframe
### 1. First I load all the json files into a data frame. 
### 2. "Explode" the json raw_df to flatten the arrays into rows.
### 3. Use df.select and df.filter to select only the fields/columns we're interested in.

In [1]:
# I clear the cache just in case and for future runs of the pipeline
spark.catalog.clearCache()


StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 3, Finished, Available, Finished)

In [2]:
# Reading all the json files extracted with the World Bank API
raw_df = spark.read.option("multiline", "true").json("Files/SE4ALL_Pages/*.json")

# I check there are indeed several distinct indicators as there was only one when extraction failed to paginate the results.
print("Raw row count:", raw_df.count())
raw_df.select("value.INDICATOR").distinct().show(200)



StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 4, Finished, Available, Finished)

Raw row count: 143
+--------------------+
|           INDICATOR|
+--------------------+
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_INF...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_INF...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_EGE...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_FCO...|
|[WB_SE4ALL_EG_

In [3]:
from pyspark.sql.functions import explode, col, trim

# Read all JSON files from the folder
raw_df = spark.read.option("multiline", "true").json("Files/SE4ALL_Pages/*.json")

# Explode the "value" array into individual rows
exploded_df = raw_df.select(explode("value").alias("entry"))

# Filter out incomplete or invalid records
filtered_df = exploded_df.filter(
    col("entry.OBS_VALUE").isNotNull() &
    (trim(col("entry.OBS_VALUE")) != "") &
    (col("entry.OBS_VALUE") != "..") &
    col("entry.TIME_PERIOD").isNotNull() &
    col("entry.REF_AREA").isNotNull()
)

# Select and rename the fields I need
clean_df = filtered_df.select(    
    col("entry.TIME_PERIOD").alias("YEAR"),
    col("entry.REF_AREA").alias("REF_AREA"),
    col("entry.INDICATOR").alias("INDICATOR"),
    col("entry.URBANISATION").alias("URBANISATION"),
    col("entry.COMP_BREAKDOWN_1").alias("COMP_BREAKDOWN_1"),
    col("entry.OBS_VALUE").alias("VALUE")
)
clean_df = clean_df.dropDuplicates()

print("Cleaned row count:", clean_df.count())
display(clean_df)



StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 5, Finished, Available, Finished)

Cleaned row count: 141906


SynapseWidget(Synapse.DataFrame, 6d77fc82-262c-4d49-93f9-b12bcc04a602)

In [4]:
# Last verification of having the right number of distinct indicators.
clean_df.select("Indicator").distinct().count()


StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 6, Finished, Available, Finished)

8

# Step 2: Integrating metadata
## 1. Loaded the metadata file available in World Bank's website.
## 2. Created small dimension tables to create a star shaped data model.

In [5]:
# Load metadata JSON
meta_raw = spark.read.option("multiline", "true").json("Files/WB_SE4ALL_metadata.json")
meta_raw.printSchema()
display(meta_raw)


StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 7, Finished, Available, Finished)

root
 |-- changed: string (nullable = true)
 |-- changed_by: string (nullable = true)
 |-- changed_utc: string (nullable = true)
 |-- created: string (nullable = true)
 |-- created_by: string (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- database_description: struct (nullable = true)
 |    |-- abstract: string (nullable = true)
 |    |-- authoring_entity: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- uri: string (nullable = true)
 |    |-- citation: string (nullable = true)
 |    |-- geographic_units: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |-- license: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = 

SynapseWidget(Synapse.DataFrame, 9365a74a-4f59-457d-94e6-d4f7c5b8bf6e)

I realised this is not the metadata I need. I decided to create small dimension tables with the metadata, i.e. a table for the 'Area_id' or 'Ref_area' and the 'Ref_area_label'. I will upload them and then link them to the fact table to have access to "human friendly" names. This metadata can be used in subsequent datasets that go through the pipeline. My semantic model will be a star schema one.


In [6]:
# I load files that will be used as dimension tables
dim_area = spark.read.option("header", "true").csv("Files/ref_area_lookup.csv")
dim_indicator = spark.read.option("header", "true").csv("Files/indicator_lookup.csv")
dim_urbanisation = spark.read.option("header", "true").csv("Files/urbanisation_lookup.csv")
dim_composition = spark.read.option("header", "true").csv("Files/comp_breakdown_lookup.csv")



# I write them as Delta tables(dimension tables)
dim_area.write.mode("overwrite").format("delta").saveAsTable("Dim_Area")
dim_indicator.write.mode("overwrite").format("delta").saveAsTable("Dim_Indicator")
dim_urbanisation.write.mode("overwrite").format("delta").saveAsTable("Dim_Urbanisation")
dim_composition.write.mode("overwrite").format("delta").saveAsTable("Dim_Composition")




StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 8, Finished, Available, Finished)

# Step 4: Polishing df fields and creating new columns and a new date table.
### 1. Adding extra column to differentiate between countries and regions
### 2. Changing data type of 'Value' to decimal.
### 3. Creating a customized date table to add it to the data model and have more robust time related visualizations.

The "Area" dimension contains countries but also regions and the value "World". Therefore, I'm going to add an extra column to help me filtering only the country values or the regions when needed.


In [7]:
from pyspark.sql.functions import when, col

region_codes = ["WLD", "LCN", "NAC", "SAS", "SSF"]

dim_area = dim_area.withColumn(
    "Area_Type",
    when(col("REF_AREA").isin(region_codes), "Region").otherwise("Country")
)


StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 9, Finished, Available, Finished)

In [8]:
# I save the table again with the new column
dim_area.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable("Dim_Area")

StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 10, Finished, Available, Finished)

In [9]:
# Changing 'Value' data type to decimal and Year to integer (in order to link it to the datetable during data modelling)
df_final = clean_df.withColumn("VALUE", col("VALUE").cast("double"))\
                   .withColumn("YEAR", col("YEAR").cast("int"))

print("Row count in final DataFrame:", df_final.count())



StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 11, Finished, Available, Finished)

Row count in final DataFrame: 141906


Now I am going to add a customized date table so it can be part of the data model that I will base my Power reports on. As described by the World Bank documentation, observations span from 1990 to 2022.

In [10]:
from pyspark.sql.types import DateType, IntegerType
from pyspark.sql.functions import to_date, lit, col
from datetime import datetime

# Create a list of years
years = list(range(1990, 2023))  # Inclusive of 2022

# Convert years to DataFrame with "Date" column as Jan 1st of each year
df_years = spark.createDataFrame([(datetime(year, 1, 1),) for year in years], ["Date"])

# Add calendar columns
from pyspark.sql.functions import year, quarter

df_years = df_years.withColumn("Year", year("Date")) \
                   .withColumn("Quarter", quarter("Date"))

# Save as a Delta table
df_years.write.format("delta").mode("overwrite").saveAsTable("dim_date_clean")




StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 12, Finished, Available, Finished)

# Step 5: Saving the data frame in the Lakehouse as a delta table

In [12]:
df_final.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("SE4ALL_Cleaned")


#Final verification to see that the dataframe was effectively saved as a delta table in the lakehouse
spark.sql("SELECT * FROM SE4ALL_Cleaned LIMIT 10").show()


StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 14, Finished, Available, Finished)

+----+--------+--------------------+------------+--------------------+-----+
|YEAR|REF_AREA|           INDICATOR|URBANISATION|    COMP_BREAKDOWN_1|VALUE|
+----+--------+--------------------+------------+--------------------+-----+
|2015|     JAM|WB_SE4ALL_EG_EGEN...|          _T|WB_SE4ALL_RENEWSR...|  0.0|
|2015|     MKD|WB_SE4ALL_EG_EGEN...|          _T|WB_SE4ALL_RENEWSR...|  0.0|
|2017|     IND|WB_SE4ALL_EG_EGEN...|          _T|WB_SE4ALL_RENEWSR...|  0.0|
|2011|     JAM|WB_SE4ALL_EG_EGEN...|          _T|WB_SE4ALL_RENEWSR...|  0.0|
|2012|     ECU|WB_SE4ALL_EG_EGEN...|          _T|WB_SE4ALL_RENEWSR...|  0.0|
|2013|     IRQ|WB_SE4ALL_EG_EGEN...|          _T|WB_SE4ALL_RENEWSR...|  0.0|
|2013|     SWE|WB_SE4ALL_EG_EGEN...|          _T|WB_SE4ALL_RENEWSR...|  0.0|
|2014|     FSM|WB_SE4ALL_EG_EGEN...|          _T|WB_SE4ALL_RENEWSR...|  0.0|
|2014|     ZMB|WB_SE4ALL_EG_EGEN...|          _T|WB_SE4ALL_RENEWSR...|  0.0|
|2008|     HKG|WB_SE4ALL_EG_EGEN...|          _T|WB_SE4ALL_RENEWSR...|  0.0|

In [13]:
print("Row count in final DataFrame:", df_final.count())

StatementMeta(, 276f253e-33cf-448e-8957-4b4e4981eb03, 15, Finished, Available, Finished)

Row count in final DataFrame: 141906
