In [1]:
dbutils.fs.mount(
source = "wasbs://covidcontainer@covidstgaccount.blob.core.windows.net",
mount_point = "/mnt/covidmnt",
extra_configs = {"fs.azure.account.key.covidstgaccount.blob.core.windows.net":dbutils.secrets.get(scope = "dbscope", key = "dbkeyvault")})

In [2]:
df = spark.read.text("/mnt/covidmnt/07-25-2020.csv")

In [3]:
df.show()


In [4]:
%fs
head /mnt/covidmnt/07-25-2020.csv

In [5]:
data_with_inferSchema = spark.read.format("csv") \
                .option("header", "true") \
                .load("/mnt/covidmnt/07-25-2020.csv")


In [6]:
data_with_inferSchema.createOrReplaceTempView("covid_19")

In [7]:
%sql 
select * from covid_19 where Country_Region = "India"

FIPS,Admin2,Province_State,Country_Region,Last_Update,Lat,Long_,Confirmed,Deaths,Recovered,Active,Combined_Key,Incidence_Rate,Case-Fatality_Ratio
,,Andaman and Nicobar Islands,India,2020-07-26 04:35:00,11.225999,92.968178,290,0,176,114,"Andaman and Nicobar Islands, India",69.53836119663531,0.0
,,Andhra Pradesh,India,2020-07-26 04:35:00,15.9129,79.74,88671,985,43255,44431,"Andhra Pradesh, India",164.49984883140846,1.1108479660768458
,,Arunachal Pradesh,India,2020-07-26 04:35:00,27.768456,96.384277,1126,3,428,695,"Arunachal Pradesh, India",71.6988292587258,0.2664298401420959
,,Assam,India,2020-07-26 04:35:00,26.357149,92.830441,31086,77,23055,7954,"Assam, India",87.30296276531166,0.2476999292285916
,,Bihar,India,2020-07-26 04:35:00,25.679658,85.60484,36604,234,24053,12317,"Bihar, India",29.330145596400428,0.639274396240848
,,Chandigarh,India,2020-07-26 04:35:00,30.733839,76.76827800000002,852,13,555,284,"Chandigarh, India",73.54508909573205,1.5258215962441315
,,Chhattisgarh,India,2020-07-26 04:35:00,21.264705,82.035366,7087,39,4683,2365,"Chhattisgarh, India",24.07577247236577,0.5503033723719486
,,Dadra and Nagar Haveli and Daman and Diu,India,2020-07-26 04:35:00,20.194742,73.080901,860,2,530,328,"Dadra and Nagar Haveli and Daman and Diu, India",139.6729703568482,0.2325581395348837
,,Delhi,India,2020-07-26 04:35:00,28.646519,77.10898,129531,3806,113068,12657,"Delhi, India",692.2748114710754,2.938292763894357
,,Goa,India,2020-07-26 04:35:00,15.359682,74.057396,4686,33,3047,1606,"Goa, India",295.4137115839244,0.704225352112676


In [8]:
from pyspark.sql.types import *

explicit_schema = StructType((StructField("FIPS",StringType(),True),
StructField("Admin2",StringType(),True),
StructField("Province_State",StringType(),True),
StructField("Country_Region",StringType(),True),
StructField("Last_Update", TimestampType(),True),
StructField("Lat",DoubleType(),True),
StructField("Long_",DoubleType(),True),
StructField("Confirmed",IntegerType(),True),
StructField("Deaths",IntegerType(),True),
StructField("Recovered",IntegerType(),True),
StructField("Active",IntegerType(),True),
StructField("Combined_Key",StringType(),True)))

In [9]:
data_with_explicitSchema = spark.read.format("csv") \
                .option("header", "true") \
                .schema(explicit_schema) \
                .load("/mnt/covidmnt/07-25-2020.csv")

data_with_explicitSchema.createOrReplaceTempView("covid_19_25_July")

In [10]:
%sql 
select * from covid_19_25_July where Country_Region = "India"

FIPS,Admin2,Province_State,Country_Region,Last_Update,Lat,Long_,Confirmed,Deaths,Recovered,Active,Combined_Key
,,Andaman and Nicobar Islands,India,2020-07-26T04:35:00.000+0000,11.225999,92.968178,290,0,176,114,"Andaman and Nicobar Islands, India"
,,Andhra Pradesh,India,2020-07-26T04:35:00.000+0000,15.9129,79.74,88671,985,43255,44431,"Andhra Pradesh, India"
,,Arunachal Pradesh,India,2020-07-26T04:35:00.000+0000,27.768456,96.384277,1126,3,428,695,"Arunachal Pradesh, India"
,,Assam,India,2020-07-26T04:35:00.000+0000,26.357149,92.830441,31086,77,23055,7954,"Assam, India"
,,Bihar,India,2020-07-26T04:35:00.000+0000,25.679658,85.60484,36604,234,24053,12317,"Bihar, India"
,,Chandigarh,India,2020-07-26T04:35:00.000+0000,30.733839,76.76827800000002,852,13,555,284,"Chandigarh, India"
,,Chhattisgarh,India,2020-07-26T04:35:00.000+0000,21.264705,82.035366,7087,39,4683,2365,"Chhattisgarh, India"
,,Dadra and Nagar Haveli and Daman and Diu,India,2020-07-26T04:35:00.000+0000,20.194742,73.080901,860,2,530,328,"Dadra and Nagar Haveli and Daman and Diu, India"
,,Delhi,India,2020-07-26T04:35:00.000+0000,28.646519,77.10898,129531,3806,113068,12657,"Delhi, India"
,,Goa,India,2020-07-26T04:35:00.000+0000,15.359682,74.057396,4686,33,3047,1606,"Goa, India"


Let's create a bronze zone, usually this will be the raw zone

In [12]:
data_with_explicitSchema.write.format("delta") \
        .mode("overwrite") \
        .save("/temporary/covid_19_25_July-delta-bronze")

In [13]:
display(dbutils.fs.ls("dbfs:/temporary"))

path,name,size
dbfs:/temporary/covid_19_25_July-delta-bronze/,covid_19_25_July-delta-bronze/,0
dbfs:/temporary/covid_19_25_July-delta-gold/,covid_19_25_July-delta-gold/,0
dbfs:/temporary/covid_19_25_July-delta-silver/,covid_19_25_July-delta-silver/,0
dbfs:/temporary/covid_19_25_July-delta-silver-parquet/,covid_19_25_July-delta-silver-parquet/,0


In [14]:
%sql
SELECT * FROM delta.`/temporary/covid_19_25_July-delta-bronze` where Country_Region='India'

FIPS,Admin2,Province_State,Country_Region,Last_Update,Lat,Long_,Confirmed,Deaths,Recovered,Active,Combined_Key
,,Andaman and Nicobar Islands,India,2020-07-26T04:35:00.000+0000,11.225999,92.968178,290,0,176,114,"Andaman and Nicobar Islands, India"
,,Andhra Pradesh,India,2020-07-26T04:35:00.000+0000,15.9129,79.74,88671,985,43255,44431,"Andhra Pradesh, India"
,,Arunachal Pradesh,India,2020-07-26T04:35:00.000+0000,27.768456,96.384277,1126,3,428,695,"Arunachal Pradesh, India"
,,Assam,India,2020-07-26T04:35:00.000+0000,26.357149,92.830441,31086,77,23055,7954,"Assam, India"
,,Bihar,India,2020-07-26T04:35:00.000+0000,25.679658,85.60484,36604,234,24053,12317,"Bihar, India"
,,Chandigarh,India,2020-07-26T04:35:00.000+0000,30.733839,76.76827800000002,852,13,555,284,"Chandigarh, India"
,,Chhattisgarh,India,2020-07-26T04:35:00.000+0000,21.264705,82.035366,7087,39,4683,2365,"Chhattisgarh, India"
,,Dadra and Nagar Haveli and Daman and Diu,India,2020-07-26T04:35:00.000+0000,20.194742,73.080901,860,2,530,328,"Dadra and Nagar Haveli and Daman and Diu, India"
,,Delhi,India,2020-07-26T04:35:00.000+0000,28.646519,77.10898,129531,3806,113068,12657,"Delhi, India"
,,Goa,India,2020-07-26T04:35:00.000+0000,15.359682,74.057396,4686,33,3047,1606,"Goa, India"


Let's create the database and a bronze table

In [16]:
%sql
DROP DATABASE IF EXISTS db_delta;
CREATE DATABASE db_delta;
DROP TABLE IF EXISTS db_delta.covid_19_25_July_delta_bronze;
CREATE TABLE db_delta.covid_19_25_July_delta_bronze USING DELTA LOCATION "/temporary/covid_19_25_July-delta-bronze"

Let's load and look into the country lookup table provided by databricks community

In [18]:
display(dbutils.fs.ls("databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/UID_ISO_FIPS_LookUp_Table.csv"))

path,name,size
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/UID_ISO_FIPS_LookUp_Table.csv,UID_ISO_FIPS_LookUp_Table.csv,403028


In [19]:
lookup_data = spark.read.format("csv") \
                         .option("header", "true")  \
                         .option("inferSchema", "true")  \
                         .load("/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/UID_ISO_FIPS_LookUp_Table.csv")


lookup_data.createOrReplaceTempView("lookup_data")

In [20]:
%sql
select * from lookup_data

UID,iso2,iso3,code3,FIPS,Admin2,Province_State,Country_Region,Lat,Long_,Combined_Key,Population
4,AF,AFG,4.0,,,,Afghanistan,33.93911,67.709953,Afghanistan,38928341.0
8,AL,ALB,8.0,,,,Albania,41.1533,20.1683,Albania,2877800.0
12,DZ,DZA,12.0,,,,Algeria,28.0339,1.6596,Algeria,43851043.0
20,AD,AND,20.0,,,,Andorra,42.5063,1.5218,Andorra,77265.0
24,AO,AGO,24.0,,,,Angola,-11.2027,17.8739,Angola,32866268.0
28,AG,ATG,28.0,,,,Antigua and Barbuda,17.0608,-61.7964,Antigua and Barbuda,97928.0
32,AR,ARG,32.0,,,,Argentina,-38.4161,-63.6167,Argentina,45195777.0
51,AM,ARM,51.0,,,,Armenia,40.0691,45.0382,Armenia,2963234.0
40,AT,AUT,40.0,,,,Austria,47.5162,14.5501,Austria,9006400.0
31,AZ,AZE,31.0,,,,Azerbaijan,40.1431,47.5769,Azerbaijan,10139175.0


To create the silver table, let's join the look up table and bronze table. Usually, this is the zone where refinement of data will take place

In [22]:
silver_df = spark.sql("SELECT lookupdata.iso3, lookupdata.population, lookupdata.Country_Region, bronzedata.Last_Update, bronzedata.Confirmed, bronzedata.Recovered, bronzedata.Active, bronzedata.Deaths, bronzedata.Combined_Key from lookup_data lookupdata \
                         JOIN db_delta.covid_19_25_July_delta_bronze bronzedata \
                         WHERE lookupdata.Country_Region = bronzedata.Country_Region ")

In [23]:
silver_df.show()


In [24]:
silver_df.write.partitionBy("iso3").format("delta").mode("overwrite").save("/temporary/covid_19_25_July-delta-silver")


In [25]:
%sql
DROP TABLE IF EXISTS db_delta.covid_19_25_July_delta_silver;
CREATE TABLE db_delta.covid_19_25_July_delta_silver USING DELTA LOCATION "/temporary/covid_19_25_July-delta-silver"


In [26]:
%sql
SELECT * FROM db_delta.covid_19_25_July_delta_silver where country_region='India'

iso3,population,Country_Region,Last_Update,Confirmed,Recovered,Active,Deaths,Combined_Key
IND,1380004385,India,2020-07-19T03:35:04.000+0000,163,0,163,0,"Unknown, India"
IND,1380004385,India,2020-07-26T03:35:08.000+0000,52466,40334,11677,455,"Telangana, India"
IND,1380004385,India,2020-07-26T04:35:00.000+0000,56377,35654,19391,1332,"West Bengal, India"
IND,1380004385,India,2020-07-26T04:35:00.000+0000,5961,3495,2403,63,"Uttarakhand, India"
IND,1380004385,India,2020-07-26T04:35:00.000+0000,63742,39903,22452,1387,"Uttar Pradesh, India"
IND,1380004385,India,2020-07-26T04:35:00.000+0000,3862,2209,1642,11,"Tripura, India"
IND,1380004385,India,2020-07-26T04:35:00.000+0000,206737,151055,52273,3409,"Tamil Nadu, India"
IND,1380004385,India,2020-07-26T04:35:00.000+0000,499,142,357,0,"Sikkim, India"
IND,1380004385,India,2020-07-26T04:35:00.000+0000,35298,25306,9379,613,"Rajasthan, India"
IND,1380004385,India,2020-07-26T04:35:00.000+0000,12684,8297,4096,291,"Punjab, India"


Flexibiltiy - Delete and Update are possible in delta table unlike parquet

In [28]:
silver_df.write.partitionBy("iso3").format("parquet").mode("overwrite").save("/temporary/covid_19_25_July-delta-silver-parquet")

In [29]:
%sql
DELETE FROM  parquet.`/temporary/covid_19_25_July-delta-silver-parquet` WHERE Last_Update = '2020-03-23T23:19:21.000+0000' and iso3='IND'

In [30]:
%sql
DELETE FROM  delta.`/temporary/covid_19_25_July-delta-silver` WHERE Last_Update = '2020-03-23T23:19:21.000+0000' and iso3='IND'

Merge operation is also possible and easy to achieve using delta lake. Follows the same syntax as normal merge statements.

//.....syntax of MERGE statement....//

//you can use any other name in place of target
MERGE target_table_name AS TARGET  

//you can use any other name in place of source 
USING source_table_name AS SOURCE   
ON condition (for matching source and target table)
WHEN MATCHED (another condition for updation)

 //now use update statement syntax accordingly
THEN UPDATE                       
WHEN NOT MATCHED BY TARGET 

//now use insert statement syntax accordingly
THEN INSERT                        
WHEN NOT MATCHED BY SOURCE 
THEN DELETE;

In [32]:
from pyspark.sql.functions import col
schema_modified_df = silver_df
schema_modified_df = schema_modified_df.withColumn("Test_Column",col("Confirmed")*-1)
display(schema_modified_df)

iso3,population,Country_Region,Last_Update,Confirmed,Recovered,Active,Deaths,Combined_Key,Test_Column
AFG,38928341.0,Afghanistan,2020-07-26T04:35:00.000+0000,36036,24793,9995.0,1248,Afghanistan,-36036
ALB,2877800.0,Albania,2020-07-26T04:35:00.000+0000,4637,2637,1866.0,134,Albania,-4637
DZA,43851043.0,Algeria,2020-07-26T04:35:00.000+0000,26764,18076,7542.0,1146,Algeria,-26764
AND,77265.0,Andorra,2020-07-26T04:35:00.000+0000,897,803,42.0,52,Andorra,-897
AGO,32866268.0,Angola,2020-07-26T04:35:00.000+0000,916,242,635.0,39,Angola,-916
ATG,97928.0,Antigua and Barbuda,2020-07-26T04:35:00.000+0000,82,60,19.0,3,Antigua and Barbuda,-82
ARG,45195777.0,Argentina,2020-07-26T04:35:00.000+0000,158334,68022,87419.0,2893,Argentina,-158334
ARM,2963234.0,Armenia,2020-07-26T04:35:00.000+0000,36996,26243,10053.0,700,Armenia,-36996
AUT,9006400.0,Austria,2020-07-26T04:35:00.000+0000,20338,18124,1502.0,712,Austria,-20338
AZE,10139175.0,Azerbaijan,2020-07-26T04:35:00.000+0000,29633,22082,7143.0,408,Azerbaijan,-29633


In [33]:
schema_modified_df.write.format("delta").mode("append").save("/temporary/covid_19_25_July-delta-silver")

We tried to change the schema above and it failed while writing; but With mergeSchema, we can easily evolve the existing schema
schema_modified_df.write.option("mergeSchema","true").format("delta").mode("append").save("/temporary/covid_19_25_July-delta-silver")

In [35]:
silver_df.write.partitionBy("iso3").format("delta").mode("overwrite").save("/temporary/covid_19_25_July-delta-silver")


In [36]:
%sql
DESCRIBE HISTORY db_delta.covid_19_25_July_delta_silver

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
7,2020-08-01T05:38:09.000+0000,7217409156299318,arujit.das@ecolab.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [""iso3""])",,List(3259824485436026),0727-080718-flue384,6.0,WriteSerializable,False,"Map(numFiles -> 219, numOutputBytes -> 37587453, numOutputRows -> 11069105, numParts -> 219)"
6,2020-08-01T05:30:05.000+0000,7217409156299318,arujit.das@ecolab.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [""iso3""])",,List(3259824485436026),0727-080718-flue384,5.0,WriteSerializable,False,"Map(numFiles -> 219, numOutputBytes -> 37587453, numOutputRows -> 11069105, numParts -> 219)"
5,2020-07-31T15:14:17.000+0000,7217409156299318,arujit.das@ecolab.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3259824485436026),0727-080718-flue384,4.0,WriteSerializable,True,"Map(numFiles -> 219, numOutputBytes -> 47431931, numOutputRows -> 11069105, numParts -> 219)"
4,2020-07-29T05:22:37.000+0000,7217409156299318,arujit.das@ecolab.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [""iso3""])",,List(3259824485436026),0727-080718-flue384,3.0,WriteSerializable,False,"Map(numFiles -> 219, numOutputBytes -> 37587453, numOutputRows -> 11069105, numParts -> 219)"
3,2020-07-29T05:03:00.000+0000,7217409156299318,arujit.das@ecolab.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [""iso3""])",,List(3259824485436026),0727-080718-flue384,2.0,WriteSerializable,False,"Map(numFiles -> 219, numOutputBytes -> 37587453, numOutputRows -> 11069105, numParts -> 219)"
2,2020-07-28T13:58:29.000+0000,7217409156299318,arujit.das@ecolab.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [""iso3""])",,List(3259824485436026),0727-080718-flue384,1.0,WriteSerializable,False,"Map(numFiles -> 219, numOutputBytes -> 37587453, numOutputRows -> 11069105, numParts -> 219)"
1,2020-07-28T13:52:44.000+0000,7217409156299318,arujit.das@ecolab.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [""iso3""])",,List(3259824485436026),0727-080718-flue384,0.0,WriteSerializable,False,"Map(numFiles -> 219, numOutputBytes -> 37587453, numOutputRows -> 11069105, numParts -> 219)"
0,2020-07-28T13:34:44.000+0000,7217409156299318,arujit.das@ecolab.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [""iso3""])",,List(3259824485436026),0727-080718-flue384,,WriteSerializable,False,"Map(numFiles -> 219, numOutputBytes -> 37587453, numOutputRows -> 11069105, numParts -> 219)"


In [37]:
%sql
SELECT * FROM db_delta.covid_19_25_July_delta_silver VERSION AS OF 0

iso3,population,Country_Region,Last_Update,Confirmed,Recovered,Active,Deaths,Combined_Key
PRI,2933408,US,2020-07-06T19:33:59.000+0000,0,0,0.0,0,"Wheeler, Oregon, US"
PRI,2933408,US,2020-07-10T02:34:22.000+0000,1,0,1.0,0,"Lewis, Idaho, US"
PRI,2933408,US,2020-07-17T18:35:12.000+0000,1,0,1.0,0,"Dillingham, Alaska, US"
PRI,2933408,US,2020-07-24T01:34:55.000+0000,0,0,0.0,0,"Out of OK, Oklahoma, US"
PRI,2933408,US,2020-07-24T16:35:03.000+0000,2668,0,2647.0,21,"DeSoto, Mississippi, US"
PRI,2933408,US,2020-07-26T04:35:00.000+0000,352,0,345.0,7,"Virgin Islands, US"
PRI,2933408,US,2020-07-26T04:35:00.000+0000,0,1279414,-1247442.0,0,"Recovered, US"
PRI,2933408,US,2020-07-26T04:35:00.000+0000,40,0,38.0,2,"Northern Mariana Islands, US"
PRI,2933408,US,2020-07-26T04:35:00.000+0000,337,0,332.0,5,"Guam, US"
PRI,2933408,US,2020-07-26T04:35:00.000+0000,103,0,100.0,3,"Grand Princess, US"


Delta lake also supports concurrent stream and batch writes to same table

spark.readStream.format("delta").load("/temporary/covid_19_25_July-delta-silver").limit(1000).writeStream.format("delta").outputMode("append").start("/temporary/covid_19_25_July-delta-silver")

Delta lake itself can be a source or sink 

spark.read.format("delta").load("/temporary/covid_19_25_July-delta-silver").limit(100).write.format("delta").mode("append").save("/temporary/covid_19_25_July-delta-silver")

Let's create the gold zone now with some aggregations

In [40]:
gold_df = spark.sql("SELECT Country_Region, Combined_Key, sum(Deaths) as Total_Deaths, sum(Confirmed) as Total_Confirmed FROM db_delta.covid_19_25_July_delta_silver group by Country_Region, Combined_Key")

display(gold_df)


Country_Region,Combined_Key,Total_Deaths,Total_Confirmed
US,"Surry, North Carolina, US",13616,2546192
US,"Schuylkill, Pennsylvania, US",166796,2869572
US,"San Benito, California, US",6808,1787100
US,"Roanoke City, Virginia, US",34040,2375992
US,"Richland, Wisconsin, US",13616,71484
US,"Rice, Kansas, US",0,71484
US,"Ransom, North Dakota, US",0,85100
US,"O'Brien, Iowa, US",3404,374440
US,"Nelson, Virginia, US",0,108928
US,"Menifee, Kentucky, US",0,74888


In [41]:
gold_df.write.partitionBy("Country_Region").format("delta").mode("overwrite").save("/temporary/covid_19_25_July-delta-gold")

In [42]:
%sql
DROP TABLE IF EXISTS db_delta.covid_19_25_July_delta_gold;
CREATE TABLE db_delta.covid_19_25_July_delta_gold USING DELTA LOCATION "/temporary/covid_19_25_July-delta-gold"

In [43]:
%sql
select * from db_delta.covid_19_25_July_delta_gold

Country_Region,Combined_Key,Total_Deaths,Total_Confirmed
US,"DeSoto, Mississippi, US",71484,9081872
US,"Washita, Oklahoma, US",0,57868
US,"Washington, North Carolina, US",10212,333592
US,"Troup, Georgia, US",177008,7042876
US,"San Joaquin, California, US",374440,34468904
US,"Pinal, Arizona, US",415288,25114712
US,"Otsego, New York, US",17020,350612
US,"New London, Connecticut, US",347208,4636248
US,"Montgomery, Alabama, US",459540,19076016
US,"Mineral, West Virginia, US",10212,309764


In [44]:
%sql
select * from db_delta.covid_19_25_July_delta_gold where Country_Region = 'India'

Country_Region,Combined_Key,Total_Deaths,Total_Confirmed
India,"Dadra and Nagar Haveli and Daman and Diu, India",74,31820
India,"Andaman and Nicobar Islands, India",0,10730
India,"Arunachal Pradesh, India",111,41662
India,"Himachal Pradesh, India",407,75813
India,"Madhya Pradesh, India",29563,996262
India,"Uttar Pradesh, India",51319,2358454
India,"Chhattisgarh, India",1443,262219
India,"Maharashtra, India",495393,13555616
India,"Uttarakhand, India",2331,220557
India,"Puducherry, India",1406,98198


In [45]:
%sql
MERGE INTO db_delta.covid_19_25_July_delta_silver target
USING db_delta.covid_19_25_July_delta_silver  TIMESTAMP AS OF "2020-07-29T06:05:48.000+0000"
ON source.PrimaryKey = target.PrimaryKey  
WHEN MATCHED THEN UPDATE SET *

In [46]:
%sql
INSERT INTO db_delta.covid_19_25_July_delta_silver
SELECT * FROM db_delta.covid_19_25_July_delta_silver VERSION AS OF 0
