In [0]:
%fs
ls "mnt/bronze"

path,name,size,modificationTime
dbfs:/mnt/ecdc/cases_deaths.csv,cases_deaths.csv,14445098,1731360146000
dbfs:/mnt/ecdc/country_response.csv,country_response.csv,47308,1731360145000
dbfs:/mnt/ecdc/hospital_admissions.csv,hospital_admissions.csv,1058540,1731360146000
dbfs:/mnt/ecdc/testing.csv,testing.csv,85887,1731360145000


In [0]:
hospital_admissions = spark.read.csv("/mnt/bronze/hospital_admissions.csv", header=True, inferSchema = True)

In [0]:
hospital_admissions.show()

+-------+--------------------+----------+---------+------+---------------+--------------------+
|country|           indicator|      date|year_week| value|         source|                 url|
+-------+--------------------+----------+---------+------+---------------+--------------------+
|Austria|Daily hospital oc...|2020-04-02| 2020-W14|1057.0|   Surveillance|https://www.sozia...|
|Austria|Daily hospital oc...|2020-04-08| 2020-W15|1096.0|   Surveillance|https://www.sozia...|
|Austria|Daily hospital oc...|2020-04-15| 2020-W16|1001.0|   Surveillance|https://info.gesu...|
|Austria|Daily hospital oc...|2020-04-16| 2020-W16| 967.0|   Surveillance|https://www.sozia...|
|Austria|Daily hospital oc...|2020-04-17| 2020-W16| 909.0|   Surveillance|https://www.sozia...|
|Austria|Daily hospital oc...|2020-04-19| 2020-W16| 817.0|   Surveillance|https://www.sozia...|
|Austria|Daily hospital oc...|2020-04-21| 2020-W17| 756.0|   Surveillance|https://www.sozia...|
|Austria|Daily hospital oc...|2020-04-22

In [0]:
columns_needed = ['country','indicator','date','year_week','value']
hospital_admissions_df = hospital_admissions.select(columns_needed)
hospital_admissions_df.show()

+-------+--------------------+----------+---------+------+
|country|           indicator|      date|year_week| value|
+-------+--------------------+----------+---------+------+
|Austria|Daily hospital oc...|2020-04-02| 2020-W14|1057.0|
|Austria|Daily hospital oc...|2020-04-08| 2020-W15|1096.0|
|Austria|Daily hospital oc...|2020-04-15| 2020-W16|1001.0|
|Austria|Daily hospital oc...|2020-04-16| 2020-W16| 967.0|
|Austria|Daily hospital oc...|2020-04-17| 2020-W16| 909.0|
|Austria|Daily hospital oc...|2020-04-19| 2020-W16| 817.0|
|Austria|Daily hospital oc...|2020-04-21| 2020-W17| 756.0|
|Austria|Daily hospital oc...|2020-04-22| 2020-W17| 700.0|
|Austria|Daily hospital oc...|2020-04-23| 2020-W17| 677.0|
|Austria|Daily hospital oc...|2020-04-24| 2020-W17| 651.0|
|Austria|Daily hospital oc...|2020-04-27| 2020-W18| 579.0|
|Austria|Daily hospital oc...|2020-04-28| 2020-W18| 561.0|
|Austria|Daily hospital oc...|2020-04-29| 2020-W18| 517.0|
|Austria|Daily hospital oc...|2020-04-30| 2020-W18| 500.

Going to format the dates to make it easier to use

In [0]:
from pyspark.sql.functions import col

# Create a new week column with the last 2 characters of the year_week column
hospital_admissions_df = hospital_admissions_df.withColumn("week", col("year_week").substr(7, 2))


hospital_admissions_df.show()

# Get rid of year_week column
hospital_admissions_df  = hospital_admissions_df.drop('year_week')

+-------+--------------------+----------+---------+------+----+
|country|           indicator|      date|year_week| value|week|
+-------+--------------------+----------+---------+------+----+
|Austria|Daily hospital oc...|2020-04-02| 2020-W14|1057.0|  14|
|Austria|Daily hospital oc...|2020-04-08| 2020-W15|1096.0|  15|
|Austria|Daily hospital oc...|2020-04-15| 2020-W16|1001.0|  16|
|Austria|Daily hospital oc...|2020-04-16| 2020-W16| 967.0|  16|
|Austria|Daily hospital oc...|2020-04-17| 2020-W16| 909.0|  16|
|Austria|Daily hospital oc...|2020-04-19| 2020-W16| 817.0|  16|
|Austria|Daily hospital oc...|2020-04-21| 2020-W17| 756.0|  17|
|Austria|Daily hospital oc...|2020-04-22| 2020-W17| 700.0|  17|
|Austria|Daily hospital oc...|2020-04-23| 2020-W17| 677.0|  17|
|Austria|Daily hospital oc...|2020-04-24| 2020-W17| 651.0|  17|
|Austria|Daily hospital oc...|2020-04-27| 2020-W18| 579.0|  18|
|Austria|Daily hospital oc...|2020-04-28| 2020-W18| 561.0|  18|
|Austria|Daily hospital oc...|2020-04-29

In [0]:
unique_indicator = hospital_admissions_df.select('indicator').distinct()
unique_indicator.show()

+--------------------+
|           indicator|
+--------------------+
|Daily hospital oc...|
| Daily ICU occupancy|
|Weekly new hospit...|
|Weekly new ICU ad...|
+--------------------+



In [0]:
filtered_data = hospital_admissions_df.filter((hospital_admissions_df['country'] == 'Austria') & (hospital_admissions_df['week'] == '17'))
filtered_data.show()

+-------+--------------------+----------+-----+----+----+
|country|           indicator|      date|value|year|week|
+-------+--------------------+----------+-----+----+----+
|Austria|Daily hospital oc...|2020-04-21|756.0|2020|  17|
|Austria|Daily hospital oc...|2020-04-22|700.0|2020|  17|
|Austria|Daily hospital oc...|2020-04-23|677.0|2020|  17|
|Austria|Daily hospital oc...|2020-04-24|651.0|2020|  17|
|Austria| Daily ICU occupancy|2020-04-21|196.0|2020|  17|
|Austria| Daily ICU occupancy|2020-04-22|176.0|2020|  17|
|Austria| Daily ICU occupancy|2020-04-23|169.0|2020|  17|
|Austria| Daily ICU occupancy|2020-04-24|156.0|2020|  17|
+-------+--------------------+----------+-----+----+----+



I think making a Daily table and Weekly table would make sense. So summing up the data for each week and having a weekly table which I will do in a seperate notebook so I can run it in ADF weekly

In [0]:
from pyspark.sql import functions as F
# Get the week start date
week_start_df = hospital_admissions_df.groupBy("week").agg(
    F.min("date").alias("week_start_date")  # Get the earliest date for each week
)

# Show the resulting DataFrame with week start dates
week_start_df.show()

+----+---------------+
|week|week_start_date|
+----+---------------+
|  07|     2020-02-10|
|  15|     2020-04-06|
|  11|     2020-03-09|
|  29|     2020-07-13|
|  42|     2020-10-12|
|  30|     2020-07-20|
|  34|     2020-08-17|
|  22|     2020-05-25|
|  28|     2020-07-06|
|  16|     2020-04-13|
|  35|     2020-08-24|
|  43|     2020-10-19|
|  31|     2020-07-27|
|  18|     2020-04-27|
|  27|     2020-06-29|
|  17|     2020-04-20|
|  26|     2020-06-22|
|  09|     2020-02-24|
|  05|     2020-02-02|
|  19|     2020-05-04|
+----+---------------+
only showing top 20 rows



In [0]:
# Join to the main dataframe to add week start
hospital_admissions_df = hospital_admissions_df.join(
    week_start_df, on="week", how="left"  
)

hospital_admissions_df.show()

+----+-------+--------------------+----------+------+---------------+
|week|country|           indicator|      date| value|week_start_date|
+----+-------+--------------------+----------+------+---------------+
|  14|Austria|Daily hospital oc...|2020-04-02|1057.0|     2020-03-30|
|  15|Austria|Daily hospital oc...|2020-04-08|1096.0|     2020-04-06|
|  16|Austria|Daily hospital oc...|2020-04-15|1001.0|     2020-04-13|
|  16|Austria|Daily hospital oc...|2020-04-16| 967.0|     2020-04-13|
|  16|Austria|Daily hospital oc...|2020-04-17| 909.0|     2020-04-13|
|  16|Austria|Daily hospital oc...|2020-04-19| 817.0|     2020-04-13|
|  17|Austria|Daily hospital oc...|2020-04-21| 756.0|     2020-04-20|
|  17|Austria|Daily hospital oc...|2020-04-22| 700.0|     2020-04-20|
|  17|Austria|Daily hospital oc...|2020-04-23| 677.0|     2020-04-20|
|  17|Austria|Daily hospital oc...|2020-04-24| 651.0|     2020-04-20|
|  18|Austria|Daily hospital oc...|2020-04-27| 579.0|     2020-04-27|
|  18|Austria|Daily 

In [0]:
# Now make a new DataFrame for the weekly counts
weekly_hospital_admissions = hospital_admissions_df.groupBy(['indicator','country','week','week_start_date']).sum()
weekly_hospital_admissions.show()

+--------------------+--------------+----+------------------+
|           indicator|       country|week|        sum(value)|
+--------------------+--------------+----+------------------+
|Daily hospital oc...|       Denmark|  34|             133.0|
| Daily ICU occupancy|       Denmark|  17|             528.0|
|Weekly new hospit...|       Germany|  07|0.0409543752239617|
| Daily ICU occupancy|       Ireland|  26|              82.0|
| Daily ICU occupancy|   Netherlands|  41|            1790.0|
|Weekly new hospit...|      Portugal|  32| 0.788197127517743|
| Daily ICU occupancy|       Romania|  31|            2711.0|
|Daily hospital oc...|      Slovenia|  32|             158.0|
|Weekly new hospit...|      Slovenia|  17| 0.624727282513211|
|Daily hospital oc...|United Kingdom|  35|            5537.0|
| Daily ICU occupancy|       Austria|  19|             636.0|
| Daily ICU occupancy|       Belgium|  25|             407.0|
|Weekly new hospit...|        Cyprus|  24|  0.11416841439481|
|Weekly 

In [0]:
# Test with one country and week
filtered_weekly = weekly_hospital_admissions.filter((weekly_hospital_admissions['country'] == 'Austria') & (weekly_hospital_admissions['week'] == '17'))
filtered_weekly.show()

+--------------------+-------+----+----------+
|           indicator|country|week|sum(value)|
+--------------------+-------+----+----------+
| Daily ICU occupancy|Austria|  17|     697.0|
|Daily hospital oc...|Austria|  17|    2784.0|
+--------------------+-------+----+----------+



I want to pivot the data to make ICU occupany and hospital occupancy a column and get rid of indicator

In [0]:
# Pivot the DataFrame based on the 'indicator' column
pivoted_df = filtered_weekly.groupBy("country", "week") \
    .pivot("indicator") \
    .agg({"sum(value)": "first"})

pivoted_df.show()

+-------+----+-------------------+------------------------+
|country|week|Daily ICU occupancy|Daily hospital occupancy|
+-------+----+-------------------+------------------------+
|Austria|  17|              697.0|                  2784.0|
+-------+----+-------------------+------------------------+



In [0]:
weekly_hospital_admissions = weekly_hospital_admissions.groupBy("country", "week").pivot("indicator").agg({"sum(value)": "first"})

weekly_hospital_admissions.show()

+--------------+----+-------------------+------------------------+----------------------------------+---------------------------------------+
|       country|week|Daily ICU occupancy|Daily hospital occupancy|Weekly new ICU admissions per 100k|Weekly new hospital admissions per 100k|
+--------------+----+-------------------+------------------------+----------------------------------+---------------------------------------+
|       Belgium|  19|             3732.0|                 18029.0|                              NULL|                       5.44715608258343|
|United Kingdom|  24|               NULL|                 34026.0|                              NULL|                       4.50882252782386|
|   Netherlands|  21|             1807.0|                    NULL|                 0.133085193097646|                       0.34139245185918|
|        France|  18|            28344.0|                185788.0|                  1.08337377456212|                                   NULL|
|     

In [0]:
# Only grab columns we need
weekly_hospital_admissions = weekly_hospital_admissions.select("country", "week", "Daily ICU occupancy","Daily hospital occupancy")

#Rename columns as well
weekly_hospital_admissions = weekly_hospital_admissions.withColumnRenamed("Daily ICU occupancy`", "weekly_ICU_occupancy")
weekly_hospital_admissions = weekly_hospital_admissions.withColumnRenamed("Daily hospital occupancy`", "weekly_hospital_occupancy")

weekly_hospital_admissions.show()

+--------------+----+-------------------+------------------------+
|       country|week|Daily ICU occupancy|Daily hospital occupancy|
+--------------+----+-------------------+------------------------+
|       Belgium|  19|             3732.0|                 18029.0|
|United Kingdom|  24|               NULL|                 34026.0|
|   Netherlands|  21|             1807.0|                    NULL|
|        France|  18|            28344.0|                185788.0|
|       Ireland|  30|               42.0|                    84.0|
|      Bulgaria|  20|              350.0|                  2449.0|
|      Slovenia|  13|              120.0|                   596.0|
|         Spain|  36|               NULL|                    NULL|
|       Ireland|  34|               45.0|                   135.0|
|        Norway|  22|               NULL|                   235.0|
|       Germany|  25|               NULL|                    NULL|
|       Belgium|  43|             4113.0|                 2595

In [0]:
# Save the transformed data into our silver container
output_path = "abfss://ecdc@adlsmzubac125.dfs.core.windows.net/silver/weeklyhospitaladmissions.csv"

# Save Dataframe as csv in data lake 
weekly_hospital_admissions.write.csv(output_path)