### Transform testing.csv file

##### Reading testing.csv

In [0]:
testing_df = spark.read.option('inferSchema', True) \
                .option('header', True) \
                .csv('/mnt/covid19reportingadfadls/raw/ecdc/testing/testing.csv') \
                .withColumnRenamed('year_week', 'reported_year_week')

In [0]:
# display(testing_df.limit(5))

country,country_code,reported_year_week,new_cases,tests_done,population,testing_rate,positivity_rate,testing_data_source
Austria,AT,2020-W15,2041,12339,8858775,139.285623576623,16.5410487073507,Manual webscraping
Austria,AT,2020-W16,855,58488,8858775,660.226724349586,1.46183832581042,Manual webscraping
Austria,AT,2020-W17,472,33443,8858775,377.512692217603,1.41135663666537,Manual webscraping
Austria,AT,2020-W18,336,26598,8858775,300.244672655079,1.2632528761561,Country website
Austria,AT,2020-W19,307,42153,8858775,475.833283947273,0.728299290679193,Country website


##### Reading lookup.csv

In [0]:
lookup_df = spark.read.option('inferSchema', True) \
    .option('header', True) \
    .csv('/mnt/covid19reportingadfadls/lookup/country_lookup/country_lookup.csv') \
    .withColumnRenamed('country', 'lookup_country') \
    .withColumnRenamed('population', 'lookup_population')

In [0]:
# display(lookup_df.limit(5))

lookup_country,country_code_2_digit,country_code_3_digit,continent,lookup_population
Aruba,AW,ABW,America,106766
Afghanistan,AF,AFG,Asia,38928341
Angola,AO,AGO,Africa,32866268
Anguilla,AI,AIA,America,15002
Albania,AL,ALB,Europe,2862427


##### Reading dim_date.csv

In [0]:
from pyspark.sql.functions import concat, lpad, lit, min, max

In [0]:
dim_date_df = spark.read \
    .option('inferSchema', True) \
    .option('header', True) \
    .csv('/mnt/covid19reportingadfadls/lookup/dim_date/dim_date.csv') \
    .withColumn('ecdc_year_week', concat('year', lit('-W'), lpad('week_of_year', 2, '0'))) \
    .groupBy('ecdc_year_week') \
    .agg(min('date').alias('week_start_date'), max('date').alias('week_end_date'))

In [0]:
# display(dim_date_df.limit(5))

ecdc_year_week,week_start_date,week_end_date
2020-W32,2020-08-02,2020-08-08
2022-W39,2022-09-18,2022-09-24
2020-W47,2020-11-15,2020-11-21
2020-W10,2020-03-01,2020-03-07
2020-W13,2020-03-22,2020-03-28


##### Joining all detaframes and Selecting required fields

In [0]:
testing_lookup_df = testing_df.join(lookup_df, testing_df.country == lookup_df.lookup_country, 'left') \
    .drop('country_code')

In [0]:
final_df = testing_lookup_df.join(dim_date_df, testing_lookup_df.reported_year_week == dim_date_df.ecdc_year_week, 'inner') \
    .select('country', 'country_code_2_digit', 'country_code_3_digit', 'reported_year_week', 'week_start_date', 'week_end_date', 'new_cases', 'tests_done', 'population', 'testing_rate', 'positivity_rate', 'testing_data_source') \
    .withColumnRenamed('week_start_date', 'reported_week_start_date') \
    .withColumnRenamed('week_end_date', 'reported_week_end_date')

##### Write output to the processed container

In [0]:
final_df.write.mode('overwrite').format('parquet').saveAsTable('covid19_processed.testing')

In [0]:
%sql
SELECT * FROM covid19_processed.testing
LIMIT 5;

country,country_code_2_digit,country_code_3_digit,reported_year_week,reported_week_start_date,reported_week_end_date,new_cases,tests_done,population,testing_rate,positivity_rate,testing_data_source
Austria,AT,AUT,2020-W15,2020-04-05,2020-04-11,2041,12339,8858775,139.285623576623,16.5410487073507,Manual webscraping
Austria,AT,AUT,2020-W16,2020-04-12,2020-04-18,855,58488,8858775,660.226724349586,1.46183832581042,Manual webscraping
Austria,AT,AUT,2020-W17,2020-04-19,2020-04-25,472,33443,8858775,377.512692217603,1.41135663666537,Manual webscraping
Austria,AT,AUT,2020-W18,2020-04-26,2020-05-02,336,26598,8858775,300.244672655079,1.2632528761561,Country website
Austria,AT,AUT,2020-W19,2020-05-03,2020-05-09,307,42153,8858775,475.833283947273,0.728299290679193,Country website


In [0]:
dbutils.notebook.exit('Success')