Transform Population By Age data
1. Split the country code & age group
2. Exclude all data other than 2019
3. Remove non numeric data from percentage
4. Pivot the data by age group
5. Join to dim_country to get the country, 3 digit country code and the total population.

In [0]:
from pyspark.sql.functions import *

In [0]:
df_raw_population = spark.read.csv("/FileStore/newadlsgen2acc/raw/population", sep=r'\t', header=True)
df_raw_population.show(10)

+-----------------+-----+-----+-----+------+-----+-----+-----+-----+-----+-----+-----+-----+
|indic_de,geo\time|2008 |2009 |2010 | 2011 |2012 |2013 |2014 |2015 |2016 |2017 |2018 |2019 |
+-----------------+-----+-----+-----+------+-----+-----+-----+-----+-----+-----+-----+-----+
|      PC_Y0_14,AD|14.6 |14.5 |14.5 | 15.5 |15.5 |15.5 |   : |   : |   : |   : |   : |13.9 |
|      PC_Y0_14,AL|24.1 |23.3 |22.5 | 21.6 |20.7 |20.1 |19.6 |19.0 |18.5 |18.2 |17.7 |17.2 |
|      PC_Y0_14,AM|19.0 |18.6 |18.3 |    : |   : |   : |   : |19.4 |19.6 |20.0 |20.2 |20.2 |
|      PC_Y0_14,AT|15.4 |15.1 |14.9 | 14.7 |14.6 |14.4 |14.3 |14.3 |14.3 |14.4 |14.4 |14.4 |
|      PC_Y0_14,AZ|23.2 |22.6 |22.6 | 22.3 |22.2 |22.3 |22.4 |22.4 |22.5 |22.6 |22.6 |22.4 |
|      PC_Y0_14,BE|16.9 |16.9 |16.9 |17.0 b|17.0 |17.0 |17.0 |17.0 |17.0 |17.0 |17.0 |16.9 |
|      PC_Y0_14,BG|13.1 |13.1 |13.2 | 13.2 |13.4 |13.6 |13.7 |13.9 |14.0 |14.1 |14.2 |14.4 |
|      PC_Y0_14,BY|14.7 |14.6 |   : | 14.9 |15.1 |15.4 |15.7 |16.0 |16

In [0]:
df_raw_population = df_raw_population\
.withColumn('age_group', regexp_replace(split(df_raw_population['indic_de,geo\\time'], ',')[0], 'PC_', ''))\
.withColumn('country_code', split(df_raw_population['indic_de,geo\\time'], ',')[1])

df_raw_population = df_raw_population.select(col("country_code"),col("age_group"),col("2019 ").alias("percentage_2019"))
df_raw_population.createOrReplaceTempView("raw_population")

In [0]:
df_raw_population.show(5)

+------------+---------+---------------+
|country_code|age_group|percentage_2019|
+------------+---------+---------------+
|          AD|    Y0_14|          13.9 |
|          AL|    Y0_14|          17.2 |
|          AM|    Y0_14|          20.2 |
|          AT|    Y0_14|          14.4 |
|          AZ|    Y0_14|          22.4 |
+------------+---------+---------------+
only showing top 5 rows



In [0]:
df_raw_population.where((col('country_code')=='AM') & (col('age_group')=='Y0_14')).show()

df_raw_population.where((col('country_code')=='AM') & (col('age_group')=='Y0_14')).withColumn('percentage_2019',col('percentage_2019').cast('decimal')).groupBy('country_code').sum('percentage_2019').show()

+------------+---------+---------------+
|country_code|age_group|percentage_2019|
+------------+---------+---------------+
|          AM|    Y0_14|          20.2 |
|          AM|    Y0_14|          20.2 |
|          AM|    Y0_14|          20.2 |
|          AM|    Y0_14|          20.2 |
|          AM|    Y0_14|          20.2 |
|          AM|    Y0_14|          20.2 |
+------------+---------+---------------+

+------------+--------------------+
|country_code|sum(percentage_2019)|
+------------+--------------------+
|          AM|                 120|
+------------+--------------------+



In [0]:
df_raw_population_pivot = spark.sql("SELECT country_code, age_group, cast(regexp_replace(percentage_2019, '[a-z]', '') AS decimal(4,2)) AS percentage_2019 FROM raw_population WHERE length(country_code) = 2").groupBy("country_code").pivot("age_group").sum("percentage_2019").orderBy("country_code")
df_raw_population_pivot.show(10)

df_raw_population_pivot.createOrReplaceTempView("raw_population_pivot")

+------------+------+------+------+------+------+-------+
|country_code| Y0_14|Y15_24|Y25_49|Y50_64|Y65_79|Y80_MAX|
+------------+------+------+------+------+------+-------+
|          AD| 83.40| 63.60|236.40|135.00| 61.20|  20.40|
|          AL|103.20| 93.00|198.00|121.20| 68.40|  16.20|
|          AM|121.20| 70.80|221.40|114.60| 54.00|  18.00|
|          AT| 86.40| 65.40|204.00|130.20| 82.80|  30.00|
|          AZ|134.40| 84.60|234.60|105.60| 31.80|   9.00|
|          BE|101.40| 68.40|196.20|120.60| 79.80|  33.60|
|          BG| 86.40| 53.40|210.00|122.40| 99.00|  28.80|
|          BY|101.40| 59.40|219.60|127.80| 67.80|  23.40|
|          CH| 90.00| 63.60|210.00|125.40| 79.80|  31.20|
|          CY| 96.60| 76.80|222.60|107.40| 75.00|  22.20|
+------------+------+------+------+------+------+-------+
only showing top 10 rows



In [0]:
df_dim_country = spark.read.csv("/FileStore/newadlsgen2acc/lookup/dim_country", sep=r',', header=True)
df_dim_country.createOrReplaceTempView("dim_country")

In [0]:
df_processed_population = spark.sql("""SELECT c.country,
       c.country_code_2_digit,
       c.country_code_3_digit,
       c.population,
       p.Y0_14  AS age_group_0_14,
       p.Y15_24 AS age_group_15_24,
       p.Y25_49 AS age_group_25_49,
       p.Y50_64 AS age_group_50_64, 
       p.Y65_79 AS age_group_65_79,
       p.Y80_MAX AS age_group_80_max
  FROM raw_population_pivot p
  JOIN dim_country c ON p.country_code = country_code_2_digit
 ORDER BY country""")

In [0]:
df_processed_population.write.format("com.databricks.spark.csv").option("header","true").option("delimiter", ",").mode("overwrite").save("/mnt/newadlsgen2acc/processed/population")

In [0]:
spark.read.option("header","true").csv("/mnt/newadlsgen2acc/processed/population").show(5)

+-------+--------------------+--------------------+----------+--------------+---------------+---------------+---------------+---------------+----------------+
|country|country_code_2_digit|country_code_3_digit|population|age_group_0_14|age_group_15_24|age_group_25_49|age_group_50_64|age_group_65_79|age_group_80_max|
+-------+--------------------+--------------------+----------+--------------+---------------+---------------+---------------+---------------+----------------+
|Albania|                  AL|                 ALB|   2862427|        103.20|          93.00|         198.00|         121.20|          68.40|           16.20|
|Albania|                  AL|                 ALB|   2862427|        103.20|          93.00|         198.00|         121.20|          68.40|           16.20|
|Albania|                  AL|                 ALB|   2862427|        103.20|          93.00|         198.00|         121.20|          68.40|           16.20|
|Albania|                  AL|                