Previously load the parquet file in a storage from https://pandemicdatalake.blob.core.windows.net/public/curated/covid-19/ecdc_cases/latest/ecdc_cases.parquet  

# Setup the path location where the parquet file is stored

In [1]:
storagePath = "abfss://churning@churninge2edemo.dfs.core.windows.net/synapse/tables/covid"

StatementMeta(smallPool, 10, 1, Finished, Available)



In [2]:
dfcovidParquet = spark.read.load(f"{storagePath}/ecdc_cases.parquet", format="parquet")

StatementMeta(smallPool, 10, 2, Finished, Available)



# Convert the parquet file to delta

In [3]:
dfcovidParquet.write.format("delta").mode("overwrite").save(f"{storagePath}/delta")

StatementMeta(smallPool, 10, 3, Finished, Available)



In [4]:
dfCovidDelta =  spark.read.load(f"{storagePath}/delta", format="delta")

StatementMeta(smallPool, 10, 4, Finished, Available)



In [5]:
dfCovidDelta.show()

StatementMeta(smallPool, 10, 5, Finished, Available)

+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+--------------------+-----------+
|  date_rep|day|month|year|cases|deaths|countries_and_territories|geo_id|country_territory_code|pop_data_2018|continent_exp|           load_date|iso_country|
+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+--------------------+-----------+
|2020-12-14| 14|   12|2020|  746|     6|              Afghanistan|    AF|                   AFG|         null|         Asia|2021-06-22 00:05:...|         AF|
|2020-12-13| 13|   12|2020|  298|     9|              Afghanistan|    AF|                   AFG|         null|         Asia|2021-06-22 00:05:...|         AF|
|2020-12-12| 12|   12|2020|  113|    11|              Afghanistan|    AF|                   AFG|         null|         Asia|2021-06-22 00:05:...|         AF|
|2020-12-11| 11|   12|2020|   63|    10|            

# We will modify data to simulate a update/merge scenario
Load date was initially on the 22nd of June (Column load_date)
France had 0 Covid cases on the 15th of July but we will modify the result to simulate an update

In [6]:
from pyspark.sql.functions import col,lit,to_timestamp


dfCovidDeltaFrance = dfCovidDelta.withColumn("cases",lit(12)).withColumn("load_date",to_timestamp(lit("2021-06-23 00:00:00.000"),'yyyy-MM-dd HH:mm:ss.SSSS')).where("iso_country = 'FR' and date_rep = '2020-07-15'")

StatementMeta(smallPool, 10, 6, Finished, Available)



In [7]:
dfCovidDeltaFrance.write.format("delta").mode("overwrite").save(f"{storagePath}/deltaMerge")

StatementMeta(smallPool, 10, 7, Finished, Available)



# Data that will be used to merge our delta lake
Cases have been set to 12 and load_date has been set to the 23rd of June for France

In [8]:
dfCovidDeltaFrance.show()

StatementMeta(smallPool, 10, 8, Finished, Available)

+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+-------------------+-----------+
|  date_rep|day|month|year|cases|deaths|countries_and_territories|geo_id|country_territory_code|pop_data_2018|continent_exp|          load_date|iso_country|
+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+-------------------+-----------+
|2020-07-15| 15|    7|2020|   12|     0|                   France|    FR|                   FRA|         null|       Europe|2021-06-23 00:00:00|         FR|
+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+-------------------+-----------+

Show potential record difference between the former records (Delta Lake) and the upcoming record

In [9]:
dfDiff = dfCovidDelta.join(dfCovidDeltaFrance, (dfCovidDelta.date_rep ==  dfCovidDeltaFrance.date_rep) & (dfCovidDelta.iso_country == dfCovidDeltaFrance.iso_country) & (dfCovidDelta.cases !=  dfCovidDeltaFrance.cases)).select("*")

StatementMeta(smallPool, 10, 9, Finished, Available)



In [10]:
dfDiff.show()

StatementMeta(smallPool, 10, 10, Finished, Available)

+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+--------------------+-----------+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+-------------------+-----------+
|  date_rep|day|month|year|cases|deaths|countries_and_territories|geo_id|country_territory_code|pop_data_2018|continent_exp|           load_date|iso_country|  date_rep|day|month|year|cases|deaths|countries_and_territories|geo_id|country_territory_code|pop_data_2018|continent_exp|          load_date|iso_country|
+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+--------------------+-----------+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+-------------------+-----------+
|2020-07-15| 15|    7|2020|    0|     0|                   Fr

In [11]:
from delta.tables import *

deltaCovidTable = DeltaTable.forPath(spark,f"{storagePath}/delta")

StatementMeta(smallPool, 10, 11, Finished, Available)



# Create a Delta table from former storage location
France on the 15th of July had 0 case

In [12]:
deltaCovidTable.toDF().where("iso_country = 'FR' and date_rep = '2020-07-15'").show()

StatementMeta(smallPool, 10, 12, Finished, Available)

+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+--------------------+-----------+
|  date_rep|day|month|year|cases|deaths|countries_and_territories|geo_id|country_territory_code|pop_data_2018|continent_exp|           load_date|iso_country|
+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+--------------------+-----------+
|2020-07-15| 15|    7|2020|    0|     0|                   France|    FR|                   FRA|         null|       Europe|2021-06-22 00:05:...|         FR|
+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+--------------------+-----------+

# Merge operation
We merge on the colums load_date and country and decide to save data using the updated record

In [13]:



deltaCovidTable.alias("deltaLake").merge(dfCovidDeltaFrance.alias("dataUpdate"),condition="deltaLake.date_rep = dataUpdate.date_rep and deltaLake.iso_country = dataUpdate.iso_country").whenMatchedUpdate(set = {"cases":"dataUpdate.cases","load_date": "dataUpdate.load_date"}).execute()


StatementMeta(smallPool, 10, 13, Finished, Available)



After the merge our Delta Lake for France on the 15th of July has 12 cases and load_date has been updated to 23rd of June

In [14]:
deltaCovidTable.toDF().where("date_rep = '2020-07-15' and iso_country='FR'").show()

StatementMeta(smallPool, 10, 14, Finished, Available)

+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+-------------------+-----------+
|  date_rep|day|month|year|cases|deaths|countries_and_territories|geo_id|country_territory_code|pop_data_2018|continent_exp|          load_date|iso_country|
+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+-------------------+-----------+
|2020-07-15| 15|    7|2020|   12|     0|                   France|    FR|                   FRA|         null|       Europe|2021-06-23 00:00:00|         FR|
+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+-------------------+-----------+

Only record forFrance on the 15th of july has been modified

In [15]:
deltaCovidTable.toDF().where("date_rep = '2020-07-15'").show()

StatementMeta(smallPool, 10, 15, Finished, Available)

+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+--------------------+-----------+
|  date_rep|day|month|year|cases|deaths|countries_and_territories|geo_id|country_territory_code|pop_data_2018|continent_exp|           load_date|iso_country|
+----------+---+-----+----+-----+------+-------------------------+------+----------------------+-------------+-------------+--------------------+-----------+
|2020-07-15| 15|    7|2020|  285|    33|              Afghanistan|    AF|                   AFG|         null|         Asia|2021-06-22 00:05:...|         AF|
|2020-07-15| 15|    7|2020|   96|     2|                  Albania|    AL|                   ALB|         null|       Europe|2021-06-22 00:05:...|         AL|
|2020-07-15| 15|    7|2020|  527|    10|                  Algeria|    DZ|                   DZA|         null|       Africa|2021-06-22 00:05:...|         DZ|
|2020-07-15| 15|    7|2020|    3|     0|            