In [0]:
df_population = spark.read.csv("/mnt/bluetabstorage/raw/population/population_by_age_tsv", header=True, sep="\t") 

In [0]:
display(df_population.limit(10))

"indic_de,geo\time",2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019
"PC_Y0_14,AD",14.6,14.5,14.5,15.5,15.5,15.5,:,:,:,:,:,13.9
"PC_Y0_14,AL",24.1,23.3,22.5,21.6,20.7,20.1,19.6,19.0,18.5,18.2,17.7,17.2
"PC_Y0_14,AM",19.0,18.6,18.3,:,:,:,:,19.4,19.6,20.0,20.2,20.2
"PC_Y0_14,AT",15.4,15.1,14.9,14.7,14.6,14.4,14.3,14.3,14.3,14.4,14.4,14.4
"PC_Y0_14,AZ",23.2,22.6,22.6,22.3,22.2,22.3,22.4,22.4,22.5,22.6,22.6,22.4
"PC_Y0_14,BE",16.9,16.9,16.9,17.0 b,17.0,17.0,17.0,17.0,17.0,17.0,17.0,16.9
"PC_Y0_14,BG",13.1,13.1,13.2,13.2,13.4,13.6,13.7,13.9,14.0,14.1,14.2,14.4
"PC_Y0_14,BY",14.7,14.6,:,14.9,15.1,15.4,15.7,16.0,16.3,16.6,16.8,16.9
"PC_Y0_14,CH",15.5,15.3,15.2,15.1 b,15.0,14.9,14.9,14.9,14.9,14.9,15.0,15.0
"PC_Y0_14,CY",18.2,17.7,17.2,16.8,16.5,16.4,16.3,16.4,16.4,16.3,16.2,16.1


In [0]:
from pyspark.sql.functions import *

In [0]:
print(df_population.columns)

['indic_de,geo\\time', '2008 ', '2009 ', '2010 ', '2011 ', '2012 ', '2013 ', '2014 ', '2015 ', '2016 ', '2017 ', '2018 ', '2019 ']


In [0]:
df_population = df_population.drop('2008 ', '2009 ', '2010 ', '2011 ', '2012 ', '2013 ', '2014 ', '2015 ', '2016 ', '2017 ', '2018 ') \
    .withColumn("age_group", regexp_replace(split(df_population["indic_de,geo\\time"], ",")[0], "PC_Y", "")) \
    .withColumn("country_code", split(df_population["indic_de,geo\\time"], ",")[1]) \
    .drop('indic_de,geo\\time')

df = df_population.select(
    col("country_code"),
    col("age_group"),
    col("2019 ").alias("percentage_2019")
)


In [0]:
display(df.orderBy(desc("country_code")).limit(10))

country_code,age_group,percentage_2019
XK,0_14,24.4
XK,15_24,17.8
XK,25_49,35.2
XK,50_64,14.1
XK,65_79,6.8
XK,80_MAX,1.8
UK,80_MAX,5.0
UK,65_79,13.4
UK,50_64,19.1
UK,15_24,11.8


In [0]:
display(df.select("percentage_2019").distinct().limit(20))

percentage_2019
:
18.0 p
16.1
15.5 p
16.5
16.4
17.2
13.9
14.3
14.8


In [0]:
df = df \
.select(
    col("country_code"),
    col("age_group"),
    col("percentage_2019") 
) \
.withColumn(
    "percentage_2019", 
     regexp_replace(col("percentage_2019"), '[A-Za-z]', '')
) \
.withColumn(
    "percentage_2019",
    col("percentage_2019").cast("decimal(4,2)")
)

In [0]:
display(df.select("percentage_2019").distinct())

percentage_2019
22.5
20.4
19.5
13.6
32.8
2.7
16.9
20.3
31.2
7.2


In [0]:
df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- age_group: string (nullable = true)
 |-- percentage_2019: decimal(4,2) (nullable = true)



In [0]:
df = df.groupBy("country_code").pivot("age_group").sum("percentage_2019").orderBy("country_code")

In [0]:
df_country = spark.read.csv("/mnt/bluetabstorage/lookup/country_lookup/country_lookup.csv", header=True, sep=r',')

In [0]:
df_country.printSchema()

root
 |-- country: string (nullable = true)
 |-- country_code_2_digit: string (nullable = true)
 |-- country_code_3_digit: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- population: string (nullable = true)



In [0]:
df = df.join(df_country, df.country_code == df_country.country_code_2_digit)
display(df.limit(10))

country_code,0_14,15_24,25_49,50_64,65_79,80_MAX,country,country_code_2_digit,country_code_3_digit,continent,population
LT,15.1,10.5,32.7,21.9,14.0,5.8,Lithuania,LT,LTU,Europe,2794184
AZ,22.4,14.1,39.1,17.6,5.3,1.5,Azerbaijan,AZ,AZE,Europe,10139175
FI,16.0,11.2,31.2,19.9,16.3,5.5,Finland,FI,FIN,Europe,5517919
UA,15.4,9.6,37.6,20.6,12.5,4.3,Ukraine,UA,UKR,Europe,43733759
RO,15.7,10.6,35.6,19.7,13.9,4.7,Romania,RO,ROU,Europe,19414458
NL,15.9,12.3,31.8,20.9,14.6,4.6,Netherlands,NL,NLD,Europe,17282163
AM,20.2,11.8,36.9,19.1,9.0,3.0,Armenia,AM,ARM,Europe,2963234
PL,15.4,10.3,37.0,19.7,13.3,4.4,Poland,PL,POL,Europe,37972812
MK,16.4,12.4,37.5,19.7,11.5,2.5,North Macedonia,MK,MKD,Europe,2077132
EE,16.4,9.5,34.9,19.4,14.1,5.6,Estonia,EE,EST,Europe,1324820


In [0]:
df = df \
.select(
    col("country"),
    col("country_code_2_digit"),
    col("country_code_3_digit"),
    col("population"),
    col("0_14"),
    col("15_24"),
    col("25_49"),
    col("50_64"),
    col("65_79"),
    col("80_MAX")
)
display(df.limit(10))

country,country_code_2_digit,country_code_3_digit,population,0_14,15_24,25_49,50_64,65_79,80_MAX
Lithuania,LT,LTU,2794184,15.1,10.5,32.7,21.9,14.0,5.8
Azerbaijan,AZ,AZE,10139175,22.4,14.1,39.1,17.6,5.3,1.5
Finland,FI,FIN,5517919,16.0,11.2,31.2,19.9,16.3,5.5
Ukraine,UA,UKR,43733759,15.4,9.6,37.6,20.6,12.5,4.3
Romania,RO,ROU,19414458,15.7,10.6,35.6,19.7,13.9,4.7
Netherlands,NL,NLD,17282163,15.9,12.3,31.8,20.9,14.6,4.6
Armenia,AM,ARM,2963234,20.2,11.8,36.9,19.1,9.0,3.0
Poland,PL,POL,37972812,15.4,10.3,37.0,19.7,13.3,4.4
North Macedonia,MK,MKD,2077132,16.4,12.4,37.5,19.7,11.5,2.5
Estonia,EE,EST,1324820,16.4,9.5,34.9,19.4,14.1,5.6


In [0]:
df.write.format("csv").option("delimiter", ",").option("header", "true").mode("overwrite").save("/mnt/bluetabstorage/processed/population")