In [0]:
%run "../include/configuration"

In [0]:
from pyspark.sql.types import *

In [0]:
hospital_schema = StructType(fields=[StructField("country", StringType(), False),
                                     StructField("date", DateType(), False),
                                     StructField("indicator", StringType(), True),
                                     StructField("source", StringType(), True),
                                     StructField("value", DoubleType(), True),
                                     StructField("year_week", StringType(), True)])

In [0]:
hospital_admission_df = spark.read.option("multiline", True).schema(hospital_schema).json(f"{raw_covid_folder_path}/hospital_admission.json")

In [0]:
lookup_country_df = spark.read.option("header", True).csv(f"{raw_covid_folder_path}/lookup/country_lookup.csv")
lookup_date_df = spark.read.option("header", True).csv(f"{raw_covid_folder_path}/lookup/dim_date.csv")

In [0]:
country_joined_hospital_df = hospital_admission_df.join(lookup_country_df, hospital_admission_df.country == lookup_country_df.country).select(hospital_admission_df.country, lookup_country_df.country_code_2_digit, lookup_country_df.population, lookup_country_df.continent, hospital_admission_df.date, hospital_admission_df.indicator, hospital_admission_df.value, hospital_admission_df.year_week, hospital_admission_df.source)

In [0]:
daily_hospital_df = country_joined_hospital_df.filter(country_joined_hospital_df["indicator"].contains("Daily"))
weekly_hospital_df =  country_joined_hospital_df.filter(country_joined_hospital_df["indicator"].contains("Weekly"))

In [0]:
from pyspark.sql.functions import *

In [0]:
pivot_daily_hospital_df = daily_hospital_df.groupBy("date", "country","country_code_2_digit","population", "continent", "source").pivot("indicator").agg(sum("value"))

In [0]:
final_daily_hospital_df = pivot_daily_hospital_df.select(col("date"), col("country"), col("country_code_2_digit"), col("population"), col("continent"), col("Daily ICU occupancy").alias("daily_icu_occupancy"), col("Daily hospital occupancy").alias("daily_hospital_occupancy"), col("source")).orderBy(desc("date"))

In [0]:
year_week_lookup_df = lookup_date_df.withColumn("year_week_1", regexp_replace(col("year_week"), "(\\d{4})(\\d{2})", "$1-W$2"))

In [0]:
year_week_lookup_df = year_week_lookup_df.groupBy("year_week_1").agg(min(col("date")).alias("week_first_date"), max(col("date")).alias("week_last_date"))

In [0]:
joined_weekly_hospital_df = weekly_hospital_df.join(year_week_lookup_df, weekly_hospital_df.year_week == year_week_lookup_df.year_week_1)

In [0]:
pivot_weekly_hospital_df = joined_weekly_hospital_df.groupBy("country", "country_code_2_digit", "population", "continent", "year_week", "week_first_date", "week_last_date", "source").pivot("indicator").agg(sum("value"))



In [0]:
final_weekly_hospital_df = pivot_weekly_hospital_df.select(col("year_week"), col("week_first_date"), col("week_last_date"), col("country"), col("country_code_2_digit"), col("population"), col("continent"), col("Weekly new ICU admissions per 100k").alias("weekly_new_icu_admissions_per_100k"), col("Weekly new hospital admissions per 100k").alias("weekly_new_hospital_admissions_per_100k"), col("source")).orderBy(desc("year_week"))



In [0]:
final_daily_hospital_df.write.mode("overwrite").partitionBy("date").parquet(f"{processed_covid_folder_path}/daily_hospital_admissions")



In [0]:
final_weekly_hospital_df.write.mode("overwrite").partitionBy("year_week").parquet(f"{processed_covid_folder_path}/weekly_hospital_admissions")



In [0]:
daily_hospital_df = spark.read.option("header", True).parquet(f"{processed_covid_folder_path}/daily_hospital_admissions")



In [0]:
weekly_hospital_df = spark.read.option("header", True).parquet(f"{processed_covid_folder_path}/weekly_hospital_admissions")



In [0]:
daily_hospital_df.write.format("parquet").saveAsTable("daily_hospital_admissions")



In [0]:
weekly_hospital_df.write.format("parquet").saveAsTable("weekly_hospital_admissions")

