In [0]:
display(dbutils.fs.ls('/mnt/covidreportingdatalakecc/raw'))

path,name,size,modificationTime
dbfs:/mnt/covidreportingdatalakecc/raw/ecdc/,ecdc/,0,1674237996000
dbfs:/mnt/covidreportingdatalakecc/raw/tps00010.tsv,tps00010.tsv,26070,1674794295000


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df_raw_pop= spark.read.csv('/mnt/covidreportingdatalakecc/raw/tps00010.tsv',sep=r'\t', header=True,inferSchema=True)

In [0]:
display(df_raw_pop)

In [0]:
df_raw_pop.dtypes

Out[6]: [('indic_de,geo\\time', 'string'),
 ('2008 ', 'string'),
 ('2009 ', 'string'),
 ('2010 ', 'string'),
 ('2011 ', 'string'),
 ('2012 ', 'string'),
 ('2013 ', 'string'),
 ('2014 ', 'string'),
 ('2015 ', 'string'),
 ('2016 ', 'string'),
 ('2017 ', 'string'),
 ('2018 ', 'string'),
 ('2019 ', 'string')]

In [0]:
# Seperating Columns

df_raw_pop=df_raw_pop.withColumn('age_group', regexp_replace(split('indic_de,geo\\time',',')[0],'PC_',''))\
           .withColumn('country_code',split('indic_de,geo\\time',',')[1])

In [0]:
display(df_raw_pop)

In [0]:
#choosing col for our final data

df_raw_pop = df_raw_pop.select(col("country_code").alias("country_code"),
                                             col("age_group").alias("age_group"),
                                             col("2019 ").alias("percentage_2019"))

In [0]:
display(df_raw_pop)

country_code,age_group,percentage_2019
AD,Y0_14,13.9
AL,Y0_14,17.2
AM,Y0_14,20.2
AT,Y0_14,14.4
AZ,Y0_14,22.4
BE,Y0_14,16.9
BG,Y0_14,14.4
BY,Y0_14,16.9
CH,Y0_14,15.0
CY,Y0_14,16.1


In [0]:
# Creating temp view for SQL
df_raw_pop.createOrReplaceTempView("raw_pop")

In [0]:
# Pivot by Age_group (have to run temp view if we use spark.sql for sql statement)

df_raw_pop_pivot = spark.sql("SELECT country_code, age_group, cast(regexp_replace(percentage_2019, '[a-z]', '') AS decimal(4,2)) AS percentage_2019 FROM raw_pop WHERE length(country_code) = 2").groupBy("country_code").pivot("age_group").sum("percentage_2019").orderBy("country_code")

In [0]:
df_raw_pop_pivot.createOrReplaceTempView("raw_pop_pivot")

In [0]:
# Reading Country Lookup

df_dim_country=spark.read.csv("/mnt/covidreportingdatalakecc/lookup/dim_country", sep=r',', header=True)

In [0]:
df_dim_country.createOrReplaceTempView("dim_country")

In [0]:
display(df_dim_country)

country,country_code_2_digit,country_code_3_digit,continent,population
Aruba,AW,ABW,America,106766
Afghanistan,AF,AFG,Asia,38928341
Angola,AO,AGO,Africa,32866268
Anguilla,AI,AIA,America,15002
Albania,AL,ALB,Europe,2862427
Andorra,AD,AND,Europe,76177
United Arab Emirates,AE,ARE,Asia,9890400
Argentina,AR,ARG,America,45195777
Armenia,AM,ARM,Europe,2963234
Antigua and Barbuda,AG,ATG,America,97928


In [0]:
#Joining population with country lookup

df_processed_pop = spark.sql("""SELECT c.country,
       c.country_code_2_digit,
       c.country_code_3_digit,
       c.population,
       p.Y0_14  AS age_group_0_14,
       p.Y15_24 AS age_group_15_24,
       p.Y25_49 AS age_group_25_49,
       p.Y50_64 AS age_group_50_64, 
       p.Y65_79 AS age_group_65_79,
       p.Y80_MAX AS age_group_80_max
  FROM raw_pop_pivot p
  JOIN dim_country c ON p.country_code = c.country_code_2_digit
 ORDER BY c.country""")

In [0]:
display(df_processed_pop)

df_processed_pop.write.format("com.databricks.spark.csv").option("header","true").option("delimiter", ",").mode("overwrite").save("/mnt/covidreportingdatalakecc/processed/population")

country,country_code_2_digit,country_code_3_digit,population,age_group_0_14,age_group_15_24,age_group_25_49,age_group_50_64,age_group_65_79,age_group_80_max
Albania,AL,ALB,2862427,17.2,15.5,33.0,20.2,11.4,2.7
Andorra,AD,AND,76177,13.9,10.6,39.4,22.5,10.2,3.4
Armenia,AM,ARM,2963234,20.2,11.8,36.9,19.1,9.0,3.0
Austria,AT,AUT,8858775,14.4,10.9,34.0,21.7,13.8,5.0
Azerbaijan,AZ,AZE,10139175,22.4,14.1,39.1,17.6,5.3,1.5
Belarus,BY,BLR,9449321,16.9,9.9,36.6,21.3,11.3,3.9
Belgium,BE,BEL,11455519,16.9,11.4,32.7,20.1,13.3,5.6
Bulgaria,BG,BGR,7000039,14.4,8.9,35.0,20.4,16.5,4.8
Croatia,HR,HRV,4076246,14.4,10.9,32.5,21.6,15.2,5.3
Cyprus,CY,CYP,875899,16.1,12.8,37.1,17.9,12.5,3.7


In [0]:
display(df_processed_pop)

country,country_code_2_digit,country_code_3_digit,population,age_group_0_14,age_group_15_24,age_group_25_49,age_group_50_64,age_group_65_79,age_group_80_max
Albania,AL,ALB,2862427,17.2,15.5,33.0,20.2,11.4,2.7
Andorra,AD,AND,76177,13.9,10.6,39.4,22.5,10.2,3.4
Armenia,AM,ARM,2963234,20.2,11.8,36.9,19.1,9.0,3.0
Austria,AT,AUT,8858775,14.4,10.9,34.0,21.7,13.8,5.0
Azerbaijan,AZ,AZE,10139175,22.4,14.1,39.1,17.6,5.3,1.5
Belarus,BY,BLR,9449321,16.9,9.9,36.6,21.3,11.3,3.9
Belgium,BE,BEL,11455519,16.9,11.4,32.7,20.1,13.3,5.6
Bulgaria,BG,BGR,7000039,14.4,8.9,35.0,20.4,16.5,4.8
Croatia,HR,HRV,4076246,14.4,10.9,32.5,21.6,15.2,5.3
Cyprus,CY,CYP,875899,16.1,12.8,37.1,17.9,12.5,3.7


In [0]:
df_processed_pop.where(col('country')=='France').show()

+-------+--------------------+--------------------+----------+--------------+---------------+---------------+---------------+---------------+----------------+
|country|country_code_2_digit|country_code_3_digit|population|age_group_0_14|age_group_15_24|age_group_25_49|age_group_50_64|age_group_65_79|age_group_80_max|
+-------+--------------------+--------------------+----------+--------------+---------------+---------------+---------------+---------------+----------------+
| France|                  FR|                 FRA|  67012883|         18.00|          11.70|          31.00|          19.20|          14.00|            6.10|
+-------+--------------------+--------------------+----------+--------------+---------------+---------------+---------------+---------------+----------------+

