<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://github.com/AdastraTH/workshop-wiki/assets/3682634/0172e39f-edb5-4129-925f-03791c98ecd9" alt="Adastra Logo" style="width: 600px">
</div>

# Lab 04 - Transform and Load

*2024 University Collaboration*

## Changelog

- 2024-05-17: Sirakorn L.
  - Initial development

In [0]:
from pyspark.sql import functions as F

## Ultimate Goal

![image](https://github.com/AdastraTH/workshop-wiki/assets/3682634/17cceeb4-9325-4149-8f92-1fd7cf46d708)

We want to turn three tables into what we call "Dimensions" and "Facts" tables.

- Facts contain numerical data we want to know. We call this "measures". In this case the population count is our measure.
- Dimensions contain data for filtering/aggregating. In this case, we may want to know the value for our measure (population count) only in some country or country category.

Dimensions and Facts are linked by keys. Surrogate keys in good practice, or business key like this example for the sake of simplicity.


## Country Categoies and Countries data

This time instead of reading from the CSV/JSONs, we will read from our data catalogue.

In [0]:
country_categories_data = spark.read.table("silver.country_categories")
display(country_categories_data)

category_code,category_name,special_notes
AFE,Africa Eastern and Southern,"26 countries, stretching from the Red Sea in the North to the Cape of Good Hope in the South (https://www.worldbank.org/en/region/afr/eastern-and-southern-africa)"
AFW,Africa Western and Central,"22 countries, stretching from the westernmost point of Africa, across the equator, and partly along the Atlantic Ocean till the Republic of Congo in the South (https://www.worldbank.org/en/region/afr/western-and-central-africa)"
ARB,Arab World,Arab World aggregate. Arab World is composed of members of the League of Arab States.
CEB,Central Europe and the Baltics,Central Europe and the Baltics aggregate.
CSS,Caribbean small states,
EAP,East Asia & Pacific (excluding high income),
EAR,Early-demographic dividend,Early-dividend countries are mostly lower-middle-income countries further along the fertility transition. Fertility rates have fallen below four births per woman and the working-age share of the population is likely rising considerably.
EAS,East Asia & Pacific,East Asia and Pacific regional aggregate (includes all income levels).
ECA,Europe & Central Asia (excluding high income),
ECS,Europe & Central Asia,Europe and Central Asia regional aggregate (includes all income levels).


In [0]:
countries_data = spark.read.table("silver.countries")
display(countries_data)

country_code,country_name,income_group,region,special_notes
ABW,Aruba,High income,Latin America & Caribbean,
AFG,Afghanistan,Low income,South Asia,"The reporting period for national accounts data is designated as either calendar year basis (CY) or fiscal year basis (FY). For this country, it is fiscal year-based (fiscal year-end: March 20). Also, an estimate (PA.NUS.ATLS) of the exchange rate covers the same period and thus differs from the official exchange rate (CY). In addition, the World Bank systematically assesses the appropriateness of official exchange rates as conversion factors. In this country, multiple or dual exchange rate activity exists and must be accounted for appropriately in underlying statistics. An alternative estimate (“alternative conversion factor” - PA.NUS.ATLS) is thus calculated as a weighted average of the different exchange rates in use in the country. Doing so better reflects economic reality and leads to more accurate cross-country comparisons and country classifications by income level. For this country, this applies to the period 1960-2006. Alternative conversion factors are used in the Atlas methodology and elsewhere in World Development Indicators as single-year conversion factors."
AGO,Angola,Lower middle income,Sub-Saharan Africa,"The World Bank systematically assesses the appropriateness of official exchange rates as conversion factors. In this country, multiple or dual exchange rate activity exists and must be accounted for appropriately in underlying statistics. An alternative estimate (“alternative conversion factor” - PA.NUS.ATLS) is thus calculated as a weighted average of the different exchange rates in use in the country. Doing so better reflects economic reality and leads to more accurate cross-country comparisons and country classifications by income level. For this country, this applies to the period 1994-2022. Alternative conversion factors are used in the Atlas methodology and elsewhere in World Development Indicators as single-year conversion factors."
ALB,Albania,Upper middle income,Europe & Central Asia,
AND,Andorra,High income,Europe & Central Asia,
ARE,United Arab Emirates,High income,Middle East & North Africa,
ARG,Argentina,Upper middle income,Latin America & Caribbean,"The World Bank systematically assesses the appropriateness of official exchange rates as conversion factors. In this country, multiple or dual exchange rate activity exists and must be accounted for appropriately in underlying statistics. An alternative estimate (“alternative conversion factor” - PA.NUS.ATLS) is thus calculated as a weighted average of the different exchange rates in use in the country. Doing so better reflects economic reality and leads to more accurate cross-country comparisons and country classifications by income level. For this country, this applies to the period 1971-2018. Alternative conversion factors are used in the Atlas methodology and elsewhere in World Development Indicators as single-year conversion factors."
ARM,Armenia,Upper middle income,Europe & Central Asia,
ASM,American Samoa,High income,East Asia & Pacific,
ATG,Antigua and Barbuda,High income,Latin America & Caribbean,



We want to do the following

- Remove any country categories' entries where the category name is not found in the "region" column of the country table
- Replace "region" column in country to appropriate category code

### 1: Clean Country Categories

We will try implementing this in SQL

In [0]:
%sql

SELECT *
FROM silver.country_categories
  WHERE category_name IN (
    SELECT DISTINCT region
    FROM silver.countries
  )

category_code,category_name,special_notes
EAS,East Asia & Pacific,East Asia and Pacific regional aggregate (includes all income levels).
ECS,Europe & Central Asia,Europe and Central Asia regional aggregate (includes all income levels).
LCN,Latin America & Caribbean,
MEA,Middle East & North Africa,Middle East and North Africa regional aggregate (includes all income levels).
NAC,North America,North America regional aggregate. There are no economies in North America classified as low or middle income.
SAS,South Asia,
SSF,Sub-Saharan Africa,Sub-Saharan Africa regional aggregate (includes all income levels).



The result of SQL script will be available in the variable name `_sqldf`. Let's save it somewhere else because we will use it further.

In [0]:
cleansed_country_categories_data = _sqldf
display(cleansed_country_categories_data)

category_code,category_name,special_notes
EAS,East Asia & Pacific,East Asia and Pacific regional aggregate (includes all income levels).
ECS,Europe & Central Asia,Europe and Central Asia regional aggregate (includes all income levels).
LCN,Latin America & Caribbean,
MEA,Middle East & North Africa,Middle East and North Africa regional aggregate (includes all income levels).
NAC,North America,North America regional aggregate. There are no economies in North America classified as low or middle income.
SAS,South Asia,
SSF,Sub-Saharan Africa,Sub-Saharan Africa regional aggregate (includes all income levels).


### 2: Region to Category Code

In [0]:
cleansed_countries_data = countries_data.alias("countries").join(
  cleansed_country_categories_data.alias("categories"),
  on=F.col("countries.region") == F.col("categories.category_name"),
  how="left"
).select("country_code", "country_name", "income_group", "category_code")
display(cleansed_countries_data)

country_code,country_name,income_group,category_code
ABW,Aruba,High income,LCN
AFG,Afghanistan,Low income,SAS
AGO,Angola,Lower middle income,SSF
ALB,Albania,Upper middle income,ECS
AND,Andorra,High income,ECS
ARE,United Arab Emirates,High income,MEA
ARG,Argentina,Upper middle income,LCN
ARM,Armenia,Upper middle income,ECS
ASM,American Samoa,High income,EAS
ATG,Antigua and Barbuda,High income,LCN



## Population data

Let's read the data first.

In [0]:
populations_data = spark.read.table("silver.populations")


There are some unnecessary columns, let's drop them first as well.

In [0]:
%python

populations_data = populations_data.drop("country_name", "indicator_name", "indicator_code", "2023", "_c68")


Now we want to transform wide table to long table. Long table is preferred in data analytics.

![Long VS Wide table](https://github.com/AdastraTH/workshop-wiki/assets/3682634/365308c1-eb4a-4f8d-b076-1d91b8a5a1cd)

In [0]:
populations_unpivot_data = populations_data.unpivot(
  ["country_code"],
  [i for i in populations_data.columns if i not in ["country_code"]],
  'year', 'population'
)


The data actually contains entries like "whole world population" which is what we want to remove out.

To remove out so, we need to filter out any entries with the country code not in the cleansed country table.

In [0]:
populations_unpivot_data = populations_unpivot_data.alias("populations").join(
  cleansed_countries_data.alias("countries"),
  on=[F.col("populations.country_code") == F.col("countries.country_code")],
  how="inner"
).selectExpr("populations.*")


And now we get the data we want, without anything else for now.

In [0]:
display(populations_unpivot_data)

country_code,year,population
ABW,1960,54608
ABW,1961,55811
ABW,1962,56682
ABW,1963,57475
ABW,1964,58178
ABW,1965,58782
ABW,1966,59291
ABW,1967,59522
ABW,1968,59471
ABW,1969,59330



## Load

Let's load it to the gold layer where this will be used for reporting!

In [0]:
%sql

CREATE DATABASE IF NOT EXISTS gold

In [0]:
cleansed_countries_data.write.mode("overwrite").saveAsTable("gold.dim_countries")
cleansed_country_categories_data.write.mode("overwrite").saveAsTable("gold.dim_country_categories")
populations_unpivot_data.write.mode("overwrite").saveAsTable("gold.fact_populations")