In [134]:
%run spark_utilities.ipynb

following variables have been created:
db_name
jdbcURI
driver


In [131]:
%run custom_methods.ipynb

custom methods:
normalize_column_name
download_files


# *Import Datasets*

In [94]:
file_admission = "datasets/hospital_admissions.csv"
file_lookup = "datasets/country_lookup.csv"

if os.path.isfile(file_admission):
    df_hospital_admissions = spark.read.csv(file_admission, header=True, inferSchema=True)
else:
    print("File 'hospital_admission.csv' not found")

if os.path.isfile(file_lookup):
    df_country_lookup = spark.read.csv(file_lookup, header=True, inferSchema=True)
else:
    print("File 'country_lookup.csv' not found")

In [68]:
# df_hospital_admissions.show()

In [69]:
# df_hospital_admissions.dtypes

# *Drop column - hospital admission*

In [95]:
df_hospital_admissions = df_hospital_admissions.drop('url')

In [71]:
# df_hospital_admissions.show()

# Show Country Lookup

In [72]:
# df_country_lookup.show()

In [73]:
# df_country_lookup.dtypes

# *Join hospital admission and country lookup*

In [96]:
condition = [df_hospital_admissions.country == df_country_lookup.country]

fields = [
    df_hospital_admissions['country']
    ,'country_code_2_digit'
    ,'country_code_3_digit'
    ,'indicator'
    ,'reported_date'
    ,'reported_year_week'
    ,'population'
    ,'value'
    ,'source'
]

df_hospital_admissions = (
    df_hospital_admissions
    .join(df_country_lookup, on=condition, how='left')
    .withColumnRenamed('date', 'reported_date')
    .withColumnRenamed('year_week', 'reported_year_week')
    .select(fields)
)

In [80]:
df_hospital_admissions.show()

+-------+--------------------+--------------------+--------------------+-------------+------------------+----------+------+---------------+
|country|country_code_2_digit|country_code_3_digit|           indicator|reported_date|reported_year_week|population| value|         source|
+-------+--------------------+--------------------+--------------------+-------------+------------------+----------+------+---------------+
|Austria|                  AT|                 AUT|Daily hospital oc...|   2020-04-02|          2020-W14|   8858775|1057.0|   Surveillance|
|Austria|                  AT|                 AUT|Daily hospital oc...|   2020-04-08|          2020-W15|   8858775|1096.0|   Surveillance|
|Austria|                  AT|                 AUT|Daily hospital oc...|   2020-04-15|          2020-W16|   8858775|1001.0|   Surveillance|
|Austria|                  AT|                 AUT|Daily hospital oc...|   2020-04-16|          2020-W16|   8858775| 967.0|   Surveillance|
|Austria|           

In [76]:
df_hospital_admissions.count()

8977

# *Split dataset between daily and weekly*

In [15]:
df_hospital_admissions.select('indicator').distinct().collect()

[Row(indicator='Daily hospital occupancy'),
 Row(indicator='Daily ICU occupancy'),
 Row(indicator='Weekly new hospital admissions per 100k'),
 Row(indicator='Weekly new ICU admissions per 100k')]

In [97]:
df_daily_hospital_admissions = (
    df_hospital_admissions
    .where((col('indicator') == 'Daily hospital occupancy') | (col('indicator') == 'Daily ICU occupancy'))
)

df_weekly_hospital_admissions = (
    df_hospital_admissions
    .where((col('indicator') == 'Weekly new hospital admissions per 100k') | (col('indicator') == 'Weekly new ICU admissions per 100k'))
)

In [14]:
# df_daily_hospital_admissions.show()

+-------+--------------------+--------------------+--------------------+-------------+------------------+----------+------+---------------+
|country|country_code_2_digit|country_code_3_digit|           indicator|reported_date|reported_year_week|population| value|         source|
+-------+--------------------+--------------------+--------------------+-------------+------------------+----------+------+---------------+
|Austria|                  AT|                 AUT|Daily hospital oc...|   2020-04-02|          2020-W14|   8858775|1057.0|   Surveillance|
|Austria|                  AT|                 AUT|Daily hospital oc...|   2020-04-08|          2020-W15|   8858775|1096.0|   Surveillance|
|Austria|                  AT|                 AUT|Daily hospital oc...|   2020-04-15|          2020-W16|   8858775|1001.0|   Surveillance|
|Austria|                  AT|                 AUT|Daily hospital oc...|   2020-04-16|          2020-W16|   8858775| 967.0|   Surveillance|
|Austria|           

In [78]:
df_daily_hospital_admissions.count()

8110

In [65]:
fields = [
    'country'
    ,'indicator'
    ,'reported_year_week'
    ,'value'
]

df_weekly_hospital_admissions.select(fields).orderBy('country', col('reported_year_week').desc()).show()

+-------+--------------------+------------------+-----------------+
|country|           indicator|reported_year_week|            value|
+-------+--------------------+------------------+-----------------+
|Belgium|Weekly new hospit...|          2020-W43| 28.3793340135877|
|Belgium|Weekly new hospit...|          2020-W42| 15.4510677342511|
|Belgium|Weekly new hospit...|          2020-W41| 7.68188678313047|
|Belgium|Weekly new hospit...|          2020-W40| 4.72261448826544|
|Belgium|Weekly new hospit...|          2020-W39| 3.91077872595733|
|Belgium|Weekly new hospit...|          2020-W38| 2.76722512528677|
|Belgium|Weekly new hospit...|          2020-W37| 1.85936577818954|
|Belgium|Weekly new hospit...|          2020-W36| 1.03880059908242|
|Belgium|Weekly new hospit...|          2020-W35|0.899129930298226|
|Belgium|Weekly new hospit...|          2020-W34| 1.48400085583202|
|Belgium|Weekly new hospit...|          2020-W33|  2.1299776989589|
|Belgium|Weekly new hospit...|          2020-W32

In [17]:
df_weekly_hospital_admissions.count()

867

# *Import dim date datasets*

In [98]:
file_date = "datasets/dim_date.csv"

if os.path.isfile(file_date):
    df_dim_date = spark.read.csv("datasets/dim_date.csv", header=True, inferSchema=True)
else:
    print("File 'dim_date.csv' not found")

In [79]:
# df_dim_date.show()

# *Dim date aggregation*

In [99]:
fields = [
    'ecdc_year_week'
    ,'start_week_date'
    ,'end_week_date'
]

aggregated_dim_date = (
    df_dim_date
    .withColumn('ecdc_year_week', concat('year', lit('-W'), lpad('week_of_year', 2, '0')))
    .groupBy('ecdc_year_week')
    .agg(
        min('date').alias('start_week_date')
        ,max('date').alias('end_week_date')
    )
    .select(fields)
)

In [22]:
aggregated_dim_date.orderBy(col('ecdc_year_week').desc()).show()

+--------------+---------------+-------------+
|ecdc_year_week|start_week_date|end_week_date|
+--------------+---------------+-------------+
|      2020-W32|     2020-08-02|   2020-08-08|
|      2022-W39|     2022-09-18|   2022-09-24|
|      2020-W47|     2020-11-15|   2020-11-21|
|      2020-W10|     2020-03-01|   2020-03-07|
|      2020-W13|     2020-03-22|   2020-03-28|
|      2021-W53|     2021-12-26|   2021-12-31|
|      2020-W12|     2020-03-15|   2020-03-21|
|      2021-W42|     2021-10-10|   2021-10-16|
|      2021-W45|     2021-10-31|   2021-11-06|
|      2020-W44|     2020-10-25|   2020-10-31|
|      2022-W38|     2022-09-11|   2022-09-17|
|      2021-W22|     2021-05-23|   2021-05-29|
|      2020-W08|     2020-02-16|   2020-02-22|
|      2020-W34|     2020-08-16|   2020-08-22|
|      2022-W17|     2022-04-17|   2022-04-23|
|      2021-W51|     2021-12-12|   2021-12-18|
|      2021-W43|     2021-10-17|   2021-10-23|
|      2020-W26|     2020-06-21|   2020-06-27|
|      2020-W

# *Join weekly hospital admission with aggregate dim date*

In [100]:
condition = [df_weekly_hospital_admissions.reported_year_week == aggregated_dim_date.ecdc_year_week]

fields = [
    'country'
    ,'country_code_2_digit'
    ,'country_code_3_digit'
    ,'indicator'
    ,'reported_date'
    ,'population'
    ,'value'
    ,'source'
    ,'reported_year_week' # ecdc_year_week
    ,'start_week_date'
    ,'end_week_date'
]

df_weekly_hospital_admissions = (
    df_weekly_hospital_admissions
    .join(aggregated_dim_date, on=condition, how='inner') # left
    .select(fields)
)

In [84]:
fields = [
    'country'
    ,'country_code_2_digit'
    ,'indicator'
    # ,'reported_date'
    ,'population'
    ,'value'
    ,'reported_year_week'
    ,'start_week_date'
    ,'end_week_date'
]

df_weekly_hospital_admissions.orderBy("country", col('reported_year_week').desc()).select(fields).show()

+-------+--------------------+--------------------+----------+-----------------+------------------+---------------+-------------+
|country|country_code_2_digit|           indicator|population|            value|reported_year_week|start_week_date|end_week_date|
+-------+--------------------+--------------------+----------+-----------------+------------------+---------------+-------------+
|Belgium|                  BE|Weekly new hospit...|  11455519| 28.3793340135877|          2020-W43|     2020-10-18|   2020-10-24|
|Belgium|                  BE|Weekly new hospit...|  11455519| 15.4510677342511|          2020-W42|     2020-10-11|   2020-10-17|
|Belgium|                  BE|Weekly new hospit...|  11455519| 7.68188678313047|          2020-W41|     2020-10-04|   2020-10-10|
|Belgium|                  BE|Weekly new hospit...|  11455519| 4.72261448826544|          2020-W40|     2020-09-27|   2020-10-03|
|Belgium|                  BE|Weekly new hospit...|  11455519| 3.91077872595733|          

# *Pivot weekly hospital admission*

In [27]:
# IS REPORTED_DATE ALWAYS NULL?
df_weekly_hospital_admissions.select('reported_date').distinct().collect()

[Row(reported_date=None)]

In [101]:
# WE CAN DROP IT
df_weekly_hospital_admissions.drop('reported_date')

DataFrame[country: string, country_code_2_digit: string, country_code_3_digit: string, indicator: string, population: int, value: double, source: string, reported_year_week: string, start_week_date: date, end_week_date: date]

In [29]:
df_weekly_hospital_admissions.select('indicator').distinct().collect()

[Row(indicator='Weekly new hospital admissions per 100k'),
 Row(indicator='Weekly new ICU admissions per 100k')]

In [103]:
# PIVOT OF INDICATOR

group_by_fields = [
    'country'
    ,'country_code_2_digit'
    ,'country_code_3_digit'
    ,'population'
    ,'source'
    ,'reported_year_week' # ecdc_year_week
    ,'start_week_date'
    ,'end_week_date'
]

df_weekly_hospital_admissions = (
    df_weekly_hospital_admissions
    .groupBy(group_by_fields)
    .pivot('indicator')
    .agg({'value':'sum'})
    .withColumnRenamed('Weekly new ICU admissions per 100k', 'weekly_icu_admission')
    .withColumnRenamed('Weekly new hospital admissions per 100k', 'weekly_hospital_admission')
    # .orderBy(
    #     col("reported_year_week").desc()
    #     ,col("contry").asc()
    # )
    .fillna(0)
)

# *Order by reported_year_week and country*

In [105]:
df_weekly_hospital_admissions = (
    df_weekly_hospital_admissions
    .orderBy(
        col("country").asc()
        ,col("reported_year_week").desc()        
    )
)

In [109]:
fields = [
    'country'
    ,'weekly_icu_admission'
    ,'weekly_hospital_admission'
    ,'reported_year_week'
    ,'start_week_date'
    ,'end_week_date'
]

df_weekly_hospital_admissions.select(fields).show()

+-------+--------------------+-------------------------+------------------+---------------+-------------+
|country|weekly_icu_admission|weekly_hospital_admission|reported_year_week|start_week_date|end_week_date|
+-------+--------------------+-------------------------+------------------+---------------+-------------+
|Belgium|                 0.0|         28.3793340135877|          2020-W43|     2020-10-18|   2020-10-24|
|Belgium|                 0.0|         15.4510677342511|          2020-W42|     2020-10-11|   2020-10-17|
|Belgium|                 0.0|         7.68188678313047|          2020-W41|     2020-10-04|   2020-10-10|
|Belgium|                 0.0|         4.72261448826544|          2020-W40|     2020-09-27|   2020-10-03|
|Belgium|                 0.0|         3.91077872595733|          2020-W39|     2020-09-20|   2020-09-26|
|Belgium|                 0.0|         2.76722512528677|          2020-W38|     2020-09-13|   2020-09-19|
|Belgium|                 0.0|         1.85936

# *Rename weekly columns and sink on sql server*

In [121]:
df_weekly_hospital_admissions.columns

['country',
 'country_code_2_digit',
 'country_code_3_digit',
 'population',
 'source',
 'reported_year_week',
 'start_week_date',
 'end_week_date',
 'weekly_icu_admission',
 'weekly_hospital_admission']

In [122]:
df_weekly_hospital_admissions = (
    df_weekly_hospital_admissions
    .withColumnRenamed('weekly_icu_admission', 'new_icu_occupancy_count')
    .withColumnRenamed('weekly_hospital_admission', 'new_hospital_occupancy_count')
)

In [125]:
df_weekly_hospital_admissions.columns

['country',
 'country_code_2_digit',
 'country_code_3_digit',
 'population',
 'source',
 'reported_year_week',
 'start_week_date',
 'end_week_date',
 'new_icu_occupancy_count',
 'new_hospital_occupancy_count']

In [135]:
(
    df_weekly_hospital_admissions
    .write
    .mode("overwrite")
    .format("jdbc")
    .options(
        driver=driver
        ,user=usr
        ,password=pwd
        ,url=jdbcURI
        ,dbtable="dbo.WeeklyHospitalAdmissions"
    )
    .save()
)

# *Pivot daily hospital admission*

In [27]:
df_daily_hospital_admissions.columns

['country',
 'country_code_2_digit',
 'country_code_3_digit',
 'indicator',
 'reported_date',
 'reported_year_week',
 'population',
 'value',
 'source']

In [28]:
df_daily_hospital_admissions.select("indicator").distinct().collect()

[Row(indicator='Daily hospital occupancy'),
 Row(indicator='Daily ICU occupancy')]

In [110]:
group_by_fields = [
    'country',
     'country_code_2_digit',
     'country_code_3_digit',
     # 'indicator',
     'reported_date',
     'population',
     # 'value',
    'source'
]

df_daily_hospital_admissions = (
    df_daily_hospital_admissions
    .groupby(group_by_fields)
    .pivot("indicator")
    .agg({"value": "sum"}) # at this point the indicator's values are cols
    .withColumnRenamed("Daily hospital occupancy", "daily_hospital_occupancy")
    .withColumnRenamed("Daily ICU occupancy", "daily_icu_occupancy")
    .fillna(0)
)

In [111]:
fields = [
    "country"
    ,"population"
    ,"daily_hospital_occupancy"
    ,"daily_icu_occupancy"
    ,"reported_date"
]

df_daily_hospital_admissions.select(fields).orderBy("country", col("reported_date").asc()).show()

+-------+----------+------------------------+-------------------+-------------+
|country|population|daily_hospital_occupancy|daily_icu_occupancy|reported_date|
+-------+----------+------------------------+-------------------+-------------+
|Austria|   8858775|                  1057.0|                0.0|   2020-04-02|
|Austria|   8858775|                  1096.0|                0.0|   2020-04-08|
|Austria|   8858775|                  1001.0|                0.0|   2020-04-15|
|Austria|   8858775|                   967.0|                0.0|   2020-04-16|
|Austria|   8858775|                   909.0|                0.0|   2020-04-17|
|Austria|   8858775|                   817.0|                0.0|   2020-04-19|
|Austria|   8858775|                   756.0|              196.0|   2020-04-21|
|Austria|   8858775|                   700.0|              176.0|   2020-04-22|
|Austria|   8858775|                   677.0|              169.0|   2020-04-23|
|Austria|   8858775|                   6

# *Order by reported_date and country*

In [117]:
df_daily_hospital_admissions = (
    df_daily_hospital_admissions
    .orderBy(
        col("country").asc()
        ,col("reported_date").asc()
    )
)

In [118]:
df_daily_hospital_admissions.show()

+-------+--------------------+--------------------+-------------+----------+---------------+-------------------+------------------------+
|country|country_code_2_digit|country_code_3_digit|reported_date|population|         source|daily_icu_occupancy|daily_hospital_occupancy|
+-------+--------------------+--------------------+-------------+----------+---------------+-------------------+------------------------+
|Austria|                  AT|                 AUT|   2020-04-02|   8858775|   Surveillance|                0.0|                  1057.0|
|Austria|                  AT|                 AUT|   2020-04-08|   8858775|   Surveillance|                0.0|                  1096.0|
|Austria|                  AT|                 AUT|   2020-04-15|   8858775|   Surveillance|                0.0|                  1001.0|
|Austria|                  AT|                 AUT|   2020-04-16|   8858775|   Surveillance|                0.0|                   967.0|
|Austria|                  AT|    

# *Rename daily columns and sink to sql server*

In [126]:
df_daily_hospital_admissions.columns

['country',
 'country_code_2_digit',
 'country_code_3_digit',
 'reported_date',
 'population',
 'source',
 'daily_icu_occupancy',
 'daily_hospital_occupancy']

In [127]:
df_daily_hospital_admissions = (
    df_daily_hospital_admissions
    .withColumnRenamed('daily_icu_occupancy', 'icu_occupancy_count')
    .withColumnRenamed('daily_icu_occupancy', 'hospital_occupancy_count')
)

In [128]:
df_daily_hospital_admissions.columns

['country',
 'country_code_2_digit',
 'country_code_3_digit',
 'reported_date',
 'population',
 'source',
 'icu_occupancy_count',
 'daily_hospital_occupancy']

In [137]:
(
    df_daily_hospital_admissions
    .write
    .mode('overwrite')
    .format('jdbc')
    .options(
        driver=driver
        ,user=usr
        ,password=pwd
        ,url=jdbcURI
        ,dbtable="dbo.DailyHospitalAdmissions"
    )
    .save()
)