## Transform Population By Age data by performing the transformations below
####-----------------------------------------------------------------------
1. Split the country code & age group
2. Exclude all data other than 2019
3. Remove non numeric data from percentage
4. Pivot the data by age group

####-----------------------------------------------------------------------

### Replace **storage account name** with your storage account name before executing.

In [0]:
from pyspark.sql.functions import *

### Read the population data & create a temp view

In [0]:
df_raw_population = spark.read.csv("/mnt/covidreportingtutorialdl/raw", sep=r'\t', header=True)
df_raw_population = df_raw_population.withColumn('age_group', regexp_replace(split(df_raw_population['indic_de,geo\\time'], ',')[0], 'PC_', '')).withColumn('country_code', split(df_raw_population['indic_de,geo\\time'], ',')[1])
df_raw_population = df_raw_population.select(col("country_code").alias("country_code"),
                                             col("age_group").alias("age_group"),
                                             col("2019 ").alias("percentage_2019"))
df_raw_population.createOrReplaceTempView("raw_population")

df_raw_population.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- age_group: string (nullable = true)
 |-- percentage_2019: string (nullable = true)



### Pivot the data by age group

In [0]:
# Create a data frame with pivoted percentages
df_raw_population_pivot = spark.sql("SELECT country_code, age_group, cast(regexp_replace(percentage_2019, '[a-z]', '') AS decimal(4,2)) AS percentage_2019 FROM raw_population WHERE length(country_code) = 2").groupBy("country_code").pivot("age_group").sum("percentage_2019").orderBy("country_code")
df_raw_population_pivot.createOrReplaceTempView("raw_population_pivot")

df_raw_population_pivot.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- Y0_14: decimal(14,2) (nullable = true)
 |-- Y15_24: decimal(14,2) (nullable = true)
 |-- Y25_49: decimal(14,2) (nullable = true)
 |-- Y50_64: decimal(14,2) (nullable = true)
 |-- Y65_79: decimal(14,2) (nullable = true)
 |-- Y80_MAX: decimal(14,2) (nullable = true)



In [0]:
df_processed_population = spark.sql("""SELECT
       country_code,
       Y0_14  AS age_group_0_14,
       Y15_24 AS age_group_15_24,
       Y25_49 AS age_group_25_49,
       Y50_64 AS age_group_50_64, 
       Y65_79 AS age_group_65_79,
       Y80_MAX AS age_group_80_max
  FROM raw_population_pivot
 ORDER BY country_code""")

### Write output to the processed mount point

In [0]:
df_processed_population.write.format("com.databricks.spark.csv").option("header","true").option("delimiter", ",").mode("overwrite").save("/mnt/covidreportingtutorialdl/processed/population")