In [0]:
%py
from pyspark.sql import functions as f
from pyspark.sql import types as t

## Transform population by age data by performing the transformations below
---
1. Split the country code & age group
2. Exclude all data other than 2019
3. Remove non numeric data from percentage
4. Pivot the data by age group
5. Join to dim_country to get the country, 3 digit country code and the total population

In [0]:
%py
df_raw = spark.read.format("csv").option("header", "true").option("sep", "\t").load("abfss://bronze@dlcovidreports.dfs.core.windows.net/population/population_by_age.tsv")

## Create Schema

In [0]:
%py
schema_population = t.StructType(fields=(
    t.StructField(r"indic_de,geo\time", t.StringType(), True),
    t.StructField("2008", t.StringType(), True),
    t.StructField("2009", t.StringType(), True),
    t.StructField("2010", t.StringType(), True),
    t.StructField("2011", t.StringType(), True),
    t.StructField("2012", t.StringType(), True),
    t.StructField("2013", t.StringType(), True),
    t.StructField("2014", t.StringType(), True),
    t.StructField("2015", t.StringType(), True),
    t.StructField("2016", t.StringType(), True),
    t.StructField("2017", t.StringType(), True),
    t.StructField("2018", t.StringType(), True),
    t.StructField("2019", t.StringType(), True)
))

In [0]:
%py
df_population_raw = spark.read.format("csv").schema(schema_population).option("header", "true").option("sep", "\t").load("abfss://bronze@dlcovidreports.dfs.core.windows.net/population/population_by_age.tsv")

## Split the country code & age group

In [0]:
%py
df_split_country = df_population_raw.withColumns({"Country_Code": f.split(f.col("indic_de,geo\\time"), ",").getItem(1), "Age": f.regexp_extract(f.col(r"indic_de,geo\time"), r"PC_Y(\d+_\d+)", 1)})

In [0]:
df_age_cleaned = df_split_country.withColumn("Age", f.when(f.col("Age") == "", f.lit("sin_rango")).otherwise(f.col("Age")))

## Exclude all data other than 2019

In [0]:
%py
df_filtered = df_age_cleaned.select(f.col("Country_Code"), f.col("Age"), f.col("2019"))

## Remove non numeric data from percentage

In [0]:
%py
df_removed_no_numeric = df_filtered.withColumn("2019", f.regexp_extract(f.col("2019"), r"(\d+.\d)", 1))

In [0]:
df_clean_country_code = df_removed_no_numeric.withColumn("Country_Code", f.regexp_extract(f.col("Country_Code"), r"(\D\D)", 1))

## Pivot the data by age group

In [0]:
%py
# Transform null values
df_population_no_nulls = df_clean_country_code.withColumn("2019", f.when((f.col("2019").isNull()) | (f.trim(f.col("2019")) == ""), f.lit("0.0")).otherwise(f.col("2019")))


In [0]:
%py
# Transform 2019 to decimal(4,2)
df_population_percentage = df_population_no_nulls.withColumn("2019", f.col("2019").try_cast("decimal(5,2)"))

In [0]:
%py
df_population_pivot = df_population_percentage.groupBy("Country_Code").pivot("Age").agg(f.first("2019")).orderBy("Country_Code")

## Join to dim_country to get the country, 3 digit country code and the total population

In [0]:
%py
df_countries_raw = spark.read.format("csv").option("header", "true").option("sep", ",").load("abfss://lookup@dlcovidreports.dfs.core.windows.net/dim_country/country_lookup.csv")

In [0]:
%py
df_population_cleaned = df_population_pivot.join(df_countries_raw, df_population_pivot.Country_Code == df_countries_raw.country_code_2_digit, "inner")

In [0]:
%py
df_population = df_population_cleaned.drop("country_code_2_digit", "continent")

## Write the .csv file to datalake storage account

In [0]:
df_population.coalesce(1).write.format("csv").option("header", "true").option("sep", ",").save("abfss://staging@dlcovidreports.dfs.core.windows.net/population/")