## Summary of Notebook 

This notebook: 
- Extracts the **Weekly .xlsx** file that is downloaded into our blob storage from MOH's SAS dashboard (*sas-extracts* container in Azure Data Factory (ADF))
- Saves the raw file in the Unity Catalog's (UC) temporal environment

**Below is the location of the file in ADF's blob storage and the link to the original ETL in ADF:**

#### Step 1 - Run the `helper_utils` Notebook to provide configuration values and function to create the transformed table saved in the public environment of the UC.

In [0]:
%run "/Shared/Common Utilities/helper_utils"

In [0]:
# Import the Data pretty printer - pprint library to help print data structures in
# a readable, pretty way

import pprint

#  To utilize the configuration values in the helper_utils Notebook

config = get_config()

#### Step 2 - Set the file location. This is the file that we are extracting from ADF blob storage.

The format to set the file location is as follows:
`wasbs//[name of container]@[location of blob storage]/[name of file]`

In [0]:
file_location = "wasbs://sas-extracts@cctdash0000prd0504blob.blob.core.windows.net/Weekly .xlsx"

#### Step 3 - Read the data, create DataFrame & Testing

In this step, a dataframe containing the contents of the file is created. This dataframe will be used to create the tables in UC.
- `header` and `inferSchema` is set to `True`:
  - meaning that the first row of the file are column headers
  - the schema of the file will be determined (column names and data types)

A temporary table is also created for testing purposes to make sure the rows match with the original file saved in MSSMS.

In [0]:
#write explanation here
# table_name = temp_table_source = config["cct_db_temp_schema"] + ".temp_test_delme"

#any point? 
pp = pprint.PrettyPrinter(indent=4)

#comes from  helper_utils.py
moh_hosp_weekly_icu_df = spark.read.format("com.crealytics.spark.excel").options(header=True,inferSchema=True).load(file_location) 


moh_hosp_weekly_icu_df.createOrReplaceTempView("moh_hosp_weekly_icu_df")

query = "SELECT * from moh_hosp_weekly_icu"
#display(spark.sql(query))
    
#create_table_as_query_and_assign_perms("weeklyICU_mohsas_df", query) # fn coming from  helper_utils.py => /Shared/Common Utilities/helper_utils
    
#spark.sql(query)

In [0]:
moh_hosp_weekly_icu_df.printSchema()

root
 |-- Week: string (nullable = true)
 |-- Available baseline beds (avg): double (nullable = true)
 |-- Adult baseline occupancy: double (nullable = true)
 |-- Vented adult baseline occupancy: double (nullable = true)
 |-- Adult baseline beds (avg): double (nullable = true)
 |-- Vented adult baseline beds (avg): double (nullable = true)
 |-- Adult baseline patients (avg): double (nullable = true)
 |-- Vented adult baseline patients (avg): double (nullable = true)



In [0]:
#renames columns

moh_hosp_weekly_icu_df=make_col_names_safe(moh_hosp_weekly_icu_df)

In [0]:
moh_hosp_icu_weekly_table_source = config["cct_db_temp_schema"] + ".temp_test"
print(moh_hosp_icu_weekly_table_source)
spark.sql(f"DROP TABLE IF EXISTS hive_metastore.default.moh_hosp_icu_weekly_table_source")
# spark.sql(f"DROP TABLE IF EXISTS {temp_table_source}") 
display(moh_hosp_weekly_icu_df)
moh_hosp_weekly_icu_df.write.saveAsTable("moh_hosp_icu_weekly_table_source")


 #hive_metastore.default.temp_table_source

# #moh_hosp_weekly_icu_df = make_col_names_safe(moh_hosp_weekly_icu_df)
# moh_hosp_weekly_icu_df.write.saveAsTable(temp_table_source)

#moh_hosp_weekly_icu_df.withColumnRenamed("week","reported_week").write.saveAsTable(moh_hosp_icu_weekly_source)

development.temporal.temp_test


week,available_baseline_beds_avg,adult_baseline_occupancy,vented_adult_baseline_occupancy,adult_baseline_beds_avg,vented_adult_baseline_beds_avg,adult_baseline_patients_avg,vented_adult_baseline_patients_avg
2020 Week18 May09,603.0,0.7002982107355865,0.4161161052745586,2012.0,1319.0,1409.0,548.8571428571429
2020 Week19 May16,580.4285714285714,0.7115166145981255,0.4075598397054045,2012.0,1319.0,1431.571428571429,537.5714285714286
2020 Week20 May23,582.0,0.7107355864811133,0.3970540452723925,2012.0,1319.0,1430.0,523.7142857142857
2020 Week21 May30,534.8571428571429,0.7341664299914797,0.3985703454998375,2012.0,1319.0,1477.142857142857,525.7142857142857
2020 Week22 Jun06,541.1428571428571,0.7310423175234309,0.3977038882270118,2012.0,1319.0,1470.857142857143,524.5714285714286
2020 Week23 Jun13,530.7142857142857,0.7362255041181482,0.3946712877721217,2012.0,1319.0,1481.2857142857142,520.5714285714286
2020 Week24 Jun20,502.7142857142857,0.750142005112184,0.4047438535687209,2012.0,1319.0,1509.2857142857142,533.8571428571429
2020 Week25 Jun27,512.7142857142857,0.7451718261857427,0.3837322647026968,2012.0,1319.0,1499.2857142857142,506.1428571428572
2020 Week26 Jul04,532.2857142857143,0.735444476001136,0.347341059244016,2012.0,1319.0,1479.7142857142858,458.1428571428572
2020 Week27 Jul11,472.8571428571428,0.7649815393354161,0.3542727174266219,2012.0,1319.0,1539.142857142857,467.2857142857143


In [0]:
test_file = config["cct_db_schema"] + ".public_test"
print(test_file)

# prevent table duplication
spark.sql(f"DROP TABLE  IF EXISTS {test_file}")
# moh_hosp_weekly_icu_df.write.saveAsTable(test_file)
display(test_file)

development.public.public_test
'development.public.public_test'

In [0]:
import re
from pyspark.sql.functions import col, to_date

def second_dateFix(str):
    lowered = str.lower().replace(" ", '')

    second_type_of_date = re.compile("[0-9]{4}[a-z]{3}[0-9]{6}") #2020jan022021 (R)  #2020 Week20 May23 2021 (K)
    match = second_type_of_date.match(lowered)
    
    if match is not None:
        fp = re.sub("^[0-9]{4}", '', lowered)
        arr = re.findall('(^[a-z]{3})([0-9]{2})([0-9]{4})', fp)
        return arr[0][2]+arr[0][0]+arr[0][1]
        # return arr
    else:
        return str

#lowered = str.lower().replace(" ", '')


def dateFix(str):
    # lowered = str.lower().replace(" ", '')

    return re.sub("week[0-9][0-9]", '', lowered)




In [0]:
# moh_hosp_weekly_icu_df.select("week").show()

a = moh_hosp_weekly_icu_df.select("week")
# display(a)

from pyspark.sql.functions import when, regexp_replace, col, concat, to_date
# from pyspark.sql.functions import functions as f
import pyspark.sql.functions as f

b = a.withColumn('week', when(a.week.contains("Week"),regexp_replace(a.week,'Week','')))
b = b.withColumn('week', when(b.week.contains(" "),regexp_replace(b.week,' ','-')))
split_col = f.split(b['week'], '-')

b = b.withColumn('year', split_col.getItem(0))\
     .withColumn('week_num', split_col.getItem(1))\
     .withColumn('Month_date', split_col.getItem(2))\
     .withColumn('Year_latest', split_col.getItem(3))

b = b.withColumn('Month', concat(b.Month_date.substr(0, 3)))\
     .withColumn('Date', concat(b.Month_date.substr(4, 5)))
b = b.select('Date','Month','year',when(b.Year_latest.isNull(), b.year).otherwise(b.Year_latest).alias('Year_latest'))
# display(b)

b = b.withColumn('Date', f.concat(f.col('Date'), f.lit(" "), f.col('Month'), f.lit(" "), f.col('Year_latest')))
b = b.drop("year","week","week_num","Month_date","Month","Year_latest")

b = b.select(col("Date"), to_date(col("Date"), "dd MMM yyyy").alias("Transformed_Date")).drop("Date")
display(b)  ######transformed date

display(a)   ##original date


Transformed_Date
2020-05-09
2020-05-16
2020-05-23
2020-05-30
2020-06-06
2020-06-13
2020-06-20
2020-06-27
2020-07-04
2020-07-11


week
2020 Week18 May09
2020 Week19 May16
2020 Week20 May23
2020 Week21 May30
2020 Week22 Jun06
2020 Week23 Jun13
2020 Week24 Jun20
2020 Week25 Jun27
2020 Week26 Jul04
2020 Week27 Jul11


In [0]:
moh_hosp_weekly_icu_df 