In [10]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder \
    .appName('transform_testing') \
    .master('local[*]') \
    .getOrCreate()


In [9]:
df_testing = spark.read \
    .options(header=True, sep=',', inferSchema=True) \
    .csv('../new_ds/testing.csv')
df_testing.printSchema()
df_testing.show()

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year_week: string (nullable = true)
 |-- level: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_name: string (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- tests_done: integer (nullable = true)
 |-- population: double (nullable = true)
 |-- testing_rate: double (nullable = true)
 |-- positivity_rate: double (nullable = true)
 |-- testing_data_source: string (nullable = true)

+-------+------------+---------+--------+------+-----------+---------+----------+----------+----------------+-----------------+-------------------+
|country|country_code|year_week|   level|region|region_name|new_cases|tests_done|population|    testing_rate|  positivity_rate|testing_data_source|
+-------+------------+---------+--------+------+-----------+---------+----------+----------+----------------+-----------------+-------------------+
|Austria|          AT| 2020-W15|nationa

In [12]:
df_testing = df_testing.withColumn('population', f.col('population').cast(LongType()))

In [26]:
df_testing.where(f.col('new_cases') == '') \
.show()

+-------+------------+---------+-----+------+-----------+---------+----------+----------+------------+---------------+-------------------+
|country|country_code|year_week|level|region|region_name|new_cases|tests_done|population|testing_rate|positivity_rate|testing_data_source|
+-------+------------+---------+-----+------+-----------+---------+----------+----------+------------+---------------+-------------------+
+-------+------------+---------+-----+------+-----------+---------+----------+----------+------------+---------------+-------------------+



In [6]:
df_dim_date = spark.read \
    .options(header=True, sep=',', inferSchema=True) \
    .csv('../dim_date/dim_date.csv')
df_dim_date = df_dim_date.withColumn('date', f.expr("cast(date as DATE)"))
df_dim_date.printSchema()
df_dim_date.show()

root
 |-- date_key: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- day_name: string (nullable = true)
 |-- day_of_year: integer (nullable = true)
 |-- week_of_month: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- month_name: string (nullable = true)
 |-- year_month: integer (nullable = true)
 |-- year_week: integer (nullable = true)

+--------+----------+----+-----+---+---------+-----------+-------------+------------+----------+----------+---------+
|date_key|      date|year|month|day| day_name|day_of_year|week_of_month|week_of_year|month_name|year_month|year_week|
+--------+----------+----+-----+---+---------+-----------+-------------+------------+----------+----------+---------+
|20200101|2020-01-01|2020|    1|  1|Wednesday|          1|            1|           1|   January|    202001|   202001|
|20200102|2020-01-02|2020|    1|  

In [7]:
df_dim_country = spark.read \
    .options(header=True, sep=',', inferSchema=True) \
    .csv('../dim_country/country_lookup.csv')
df_dim_country.printSchema()
df_dim_country.show()

root
 |-- country: string (nullable = true)
 |-- country_code_2_digit: string (nullable = true)
 |-- country_code_3_digit: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- population: integer (nullable = true)

+--------------------+--------------------+--------------------+---------+----------+
|             country|country_code_2_digit|country_code_3_digit|continent|population|
+--------------------+--------------------+--------------------+---------+----------+
|               Aruba|                  AW|                 ABW|  America|    106766|
|         Afghanistan|                  AF|                 AFG|     Asia|  38928341|
|              Angola|                  AO|                 AGO|   Africa|  32866268|
|            Anguilla|                  AI|                 AIA|  America|     15002|
|             Albania|                  AL|                 ALB|   Europe|   2862427|
|             Andorra|                  AD|                 AND|   Europe|     

In [13]:
df_testing.createOrReplaceTempView('testing')
df_dim_date.createOrReplaceTempView('dim_date')
df_dim_country.createOrReplaceTempView('dim_country')

In [14]:
df_proccessed_testing = spark.sql("""
SELECT t.country  ,
      c.country_code_2_digit   ,
      c.country_code_3_digit   ,
      t.year_week   ,
      MIN(d.date) AS week_start_date, 
      MAX(d.date) AS week_end_date ,
      t.new_cases   ,
      t.tests_done  ,
      t.population  , 
      t.testing_rate   ,
      t.positivity_rate   ,
      t.testing_data_source  
FROM testing t JOIN dim_date d ON t.year_week = concat(concat(d.year, '-W'), LPAD(d.week_of_year, 2, '0'))
    JOIN dim_country c ON t.country_code = c.country_code_2_digit
GROUP BY t.country  ,
      c.country_code_2_digit   ,
      c.country_code_3_digit   ,
      t.year_week   ,
      t.new_cases   ,
      t.tests_done  ,
      t.population  , 
      t.testing_rate   ,
      t.positivity_rate   ,
      t.testing_data_source  ;
""")
df_proccessed_testing.show()

+-------+--------------------+--------------------+---------+---------------+-------------+---------+----------+----------+----------------+-----------------+-------------------+
|country|country_code_2_digit|country_code_3_digit|year_week|week_start_date|week_end_date|new_cases|tests_done|population|    testing_rate|  positivity_rate|testing_data_source|
+-------+--------------------+--------------------+---------+---------------+-------------+---------+----------+----------+----------------+-----------------+-------------------+
|Austria|                  AT|                 AUT| 2022-W17|     2022-04-17|   2022-04-23|    40387|   1403772|   8932664|15715.0431271119| 2.87703416224287|              TESSy|
|Austria|                  AT|                 AUT| 2022-W26|     2022-06-19|   2022-06-25|     2917|     12502|    394297|3170.70634572416|  23.332266837306|              TESSy|
|Belgium|                  BE|                 BEL| 2021-W41|     2021-10-03|   2021-10-09|    22871|    

In [49]:
# Check null and nan
cols = list(df_proccessed_testing.columns)
cols.remove('week_start_date')
cols.remove('week_end_date')
print(cols)

['country', 'country_code_2_digit', 'country_code_3_digit', 'year_week', 'new_cases', 'tests_done', 'population', 'testing_rate', 'positivity_rate', 'testing_data_source']


In [55]:
df_proccessed_testing.printSchema()

df_proccessed_testing.select([f.count(f.when(f.isnan(c), c)).alias(c) for c in cols]).show()

df_proccessed_testing.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in cols]).show()

root
 |-- country: string (nullable = true)
 |-- country_code_2_digit: string (nullable = true)
 |-- country_code_3_digit: string (nullable = true)
 |-- year_week: string (nullable = true)
 |-- week_start_date: date (nullable = true)
 |-- week_end_date: date (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- tests_done: integer (nullable = true)
 |-- population: long (nullable = true)
 |-- testing_rate: double (nullable = true)
 |-- positivity_rate: double (nullable = true)
 |-- testing_data_source: string (nullable = true)

+-------+--------------------+--------------------+---------+---------+----------+----------+------------+---------------+-------------------+
|country|country_code_2_digit|country_code_3_digit|year_week|new_cases|tests_done|population|testing_rate|positivity_rate|testing_data_source|
+-------+--------------------+--------------------+---------+---------+----------+----------+------------+---------------+-------------------+
|      0|                 

In [57]:
df_proccessed_testing = df_proccessed_testing.fillna(0, ['new_cases', 'positivity_rate'])

In [58]:
df_proccessed_testing.select([f.count(f.when(f.isnan(c), c)).alias(c) for c in cols]).show()

df_proccessed_testing.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in cols]).show()

+-------+--------------------+--------------------+---------+---------+----------+----------+------------+---------------+-------------------+
|country|country_code_2_digit|country_code_3_digit|year_week|new_cases|tests_done|population|testing_rate|positivity_rate|testing_data_source|
+-------+--------------------+--------------------+---------+---------+----------+----------+------------+---------------+-------------------+
|      0|                   0|                   0|        0|        0|         0|         0|           0|              0|                  0|
+-------+--------------------+--------------------+---------+---------+----------+----------+------------+---------------+-------------------+

+-------+--------------------+--------------------+---------+---------+----------+----------+------------+---------------+-------------------+
|country|country_code_2_digit|country_code_3_digit|year_week|new_cases|tests_done|population|testing_rate|positivity_rate|testing_data_source