In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
%run "../setup/functions"

In [0]:
%run "../setup/cosmosdb"

#Fetching Data from Silver Layer

In [0]:
ny_state = 'new york'
df_ny = spark.read.format("delta").load(f"{silver_folder_path}{ny_state.replace(' ', '')}")

In [0]:
california_state = 'california'
df_la = spark.read.format("delta").load(f"{silver_folder_path}{california_state.replace(' ', '')}")

##Grouping by month

In [0]:
df_ny_grouped = df_ny.groupBy('CHAVE_DATA','MUNICIPIO')\
.agg(sum('CONFIRMADOS_NY').alias('US_NY_CASOS_CONFIRMADOS'),\
     sum('MORTES_NY').alias('US_NY_NUMERO_MORTES'))\
.sort(asc('CHAVE_DATA'))

In [0]:
df_la_grouped = df_la.groupBy('CHAVE_DATA','MUNICIPIO')\
.agg(sum('CONFIRMADOS_LA').alias('US_LA_CASOS_CONFIRMADOS'),\
     sum('MORTES_LA').alias('US_LA_NUMERO_MORTES'))\
.sort(asc('CHAVE_DATA'))

##Moving Average

In [0]:
w = Window.partitionBy('MUNICIPIO').orderBy('CHAVE_DATA').rowsBetween(-3, 0)

In [0]:
df_la_mav = df_la_grouped\
  .withColumn('USA_LA_MM_CASOS_CONFIRMADOS', avg(col('US_LA_CASOS_CONFIRMADOS')).over(w))\
  .withColumn('USA_LA_MM_NUMERO_MORTES', avg(col('US_LA_NUMERO_MORTES')).over(w))\
  .drop("MUNICIPIO")

#.withColumn('list', collect_list(col('US_LA_CASOS_CONFIRMADOS')).over(w))\

In [0]:
display(df_la_mav)

CHAVE_DATA,US_LA_CASOS_CONFIRMADOS,US_LA_NUMERO_MORTES,USA_LA_MM_CASOS_CONFIRMADOS,list,USA_LA_MM_NUMERO_MORTES
202004,8067,448,8067.0,List(8067),448.0
202005,31781,1251,19924.0,"List(8067, 31781)",849.5
202006,48849,1009,29565.666666666668,"List(8067, 31781, 48849)",902.6666666666666
202007,84691,1253,43347.0,"List(8067, 31781, 48849, 84691)",990.25
202008,53227,1160,54637.0,"List(31781, 48849, 84691, 53227)",1168.25
202009,28531,792,53824.5,"List(48849, 84691, 53227, 28531)",1053.5
202010,37319,495,50942.0,"List(84691, 53227, 28531, 37319)",925.0
202011,93830,585,53226.75,"List(53227, 28531, 37319, 93830)",758.0
202012,370071,2703,132437.75,"List(28531, 37319, 93830, 370071)",1143.75
202101,345429,6411,211662.25,"List(37319, 93830, 370071, 345429)",2548.5


In [0]:
df_ny_mav = df_ny_grouped\
  .withColumn('USA_NY_MM_CASOS_CONFIRMADOS', avg(col('US_NY_CASOS_CONFIRMADOS')).over(w))\
  .withColumn('USA_NY_MM_NUMERO_MORTES', avg(col('US_NY_NUMERO_MORTES')).over(w))\
  .drop("MUNICIPIO")

#.withColumn('list', collect_list(col('US_NY_CASOS_CONFIRMADOS')).over(w))\

##Writing Final Dataframe to ADLS2 (Gold)

In [0]:
results_final_df = df_ny_mav.join(df_la_mav,'CHAVE_DATA',"outer")

In [0]:
from delta.tables import DeltaTable
if (DeltaTable.isDeltaTable(spark, f"{gold_folder_path}final_covid_df")):
    deltaTable = DeltaTable.forPath(spark, f"{gold_folder_path}final_covid_df")
    deltaTable.alias("tgt").merge(
    results_final_df.alias("src"),
     "tgt.CHAVE_DATA = src.CHAVE_DATA") \
    .whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
    .execute()
else:
    results_final_df.write.mode("overwrite")\
    .partitionBy("CHAVE_DATA")\
    .format("delta")\
    .save(f"{gold_folder_path}final_covid_df")

In [0]:
display(spark.read.format("delta").load(f"{gold_folder_path}final_covid_df").orderBy('CHAVE_DATA'))

CHAVE_DATA,US_NY_CASOS_CONFIRMADOS,US_NY_NUMERO_MORTES,USA_NY_MM_CASOS_CONFIRMADOS,USA_NY_MM_NUMERO_MORTES,US_LA_CASOS_CONFIRMADOS,US_LA_NUMERO_MORTES,USA_LA_MM_CASOS_CONFIRMADOS,USA_LA_MM_NUMERO_MORTES
202004,2768,338,2768.0,338.0,8067,448,8067.0,448.0
202005,4583,1019,3675.5,678.5,31781,1251,19924.0,849.5
202006,1717,142,3022.6666666666665,499.6666666666667,48849,1009,29565.666666666668,902.6666666666666
202007,2110,65,2794.5,391.0,84691,1253,43347.0,990.25
202008,1437,26,2461.75,313.0,53227,1160,54637.0,1168.25
202009,1517,8,1695.25,60.25,28531,792,53824.5,1053.5
202010,3075,20,2034.75,29.75,37319,495,50942.0,925.0
202011,9329,31,3839.5,21.25,93830,585,53226.75,758.0
202012,16147,109,7517.0,42.0,370071,2703,132437.75,1143.75
202101,24837,248,13347.0,102.0,345429,6411,211662.25,2548.5


##Writing to CosmosDB

In [0]:
results_final_df.withColumnRenamed('CHAVE_DATA','id')\
                .write\
                .format("cosmos.oltp")\
                .options(**cfg)\
                .mode("append")\
                .save()

In [0]:
display(spark.read\
             .format("cosmos.oltp").options(**cfg)\
             .option("spark.cosmos.read.inferSchema.enabled", "true")\
             .load()\
             .orderBy('id'))

US_NY_NUMERO_MORTES,USA_LA_MM_CASOS_CONFIRMADOS,USA_NY_MM_CASOS_CONFIRMADOS,US_NY_CASOS_CONFIRMADOS,US_LA_NUMERO_MORTES,id,US_LA_CASOS_CONFIRMADOS,USA_LA_MM_NUMERO_MORTES,USA_NY_MM_NUMERO_MORTES
338,8067.0,2768.0,2768,448,202004,8067,448.0,338.0
1019,19924.0,3675.5,4583,1251,202005,31781,849.5,678.5
142,29565.666666666668,3022.6666666666665,1717,1009,202006,48849,902.6666666666666,499.6666666666667
65,43347.0,2794.5,2110,1253,202007,84691,990.25,391.0
26,54637.0,2461.75,1437,1160,202008,53227,1168.25,313.0
8,53824.5,1695.25,1517,792,202009,28531,1053.5,60.25
20,50942.0,2034.75,3075,495,202010,37319,925.0,29.75
31,53226.75,3839.5,9329,585,202011,93830,758.0,21.25
109,132437.75,7517.0,16147,2703,202012,370071,1143.75,42.0
248,211662.25,13347.0,24837,6411,202101,345429,2548.5,102.0
