#Notebook shortcuts

### shift+enter = Run cell and move to the next one

### ctrl+alt+P = insert cell above
### ctrl+alt+N = insert cell below

### ctrl+alt+up = move cell up
### ctrl+alt+down = move cell down


for more shortcuts go to https://docs.microsoft.com/en-us/azure/databricks/notebooks/notebooks-use

# Connect to our Data Lake

###Mount Points should have already been created, so that the data persists even if the cluster doesnt. This way we can easily get to the data everytime the cluster restarts.

In [4]:
#dbutils.fs.unmount("/mnt/coviddata")

In [5]:
#dbutils.fs.ls("/mnt/")

In [7]:
#dbutils.fs.mkdirs("/mnt/coviddata/inputs")

In [8]:
#dbutils.fs.mkdirs("/mnt/coviddata/outputs")

In [9]:
#dbutils.fs.mkdirs("/mnt/coviddata/outputs")

The below cells only work the first time, when the cluster is setup.

In [11]:
#dbutils.fs.mount(
#source = "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net",
#mount_point = "/mnt/<mount-name>",
#extra_configs = {"fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net":"<access-key>"})



#dbutils.fs.mount(
#source = "wasbs://outputs@stcovidhackoutput.blob.core.windows.net",
#mount_point = "/mnt/coviddata/outputs",
#extra_configs = {"fs.azure.account.key.stcovidhackoutput.blob.core.windows.net":"exnopN56JLbbkuZxx5VLX6sJqH7pop7fWaXEgYgMkt5OY2EtqqppFako7t3wOca7oYUbThKVwmMX4wpv4bwafA=="})



In [12]:
#dbutils.fs.mount(
#source = "wasbs://inputs@stcovidhackoutput.blob.core.windows.net",
#mount_point = "/mnt/coviddata/inputs",
#extra_configs = {"fs.azure.account.key.stcovidhackoutput.blob.core.windows.net":"exnopN56JLbbkuZxx5VLX6sJqH7pop7fWaXEgYgMkt5OY2EtqqppFako7t3wOca7oYUbThKVwmMX4wpv4bwafA=="})

# Start here if the Lake has been previously mounted to this cluster

Check file structure is setup

In [15]:
display(dbutils.fs.ls("/mnt/coviddata/"))
        

path,name,size
dbfs:/mnt/coviddata/inputs/,inputs/,0
dbfs:/mnt/coviddata/outputs/,outputs/,0


Check all the required files are visible

In [17]:
display(dbutils.fs.ls("/mnt/coviddata/inputs/"))

path,name,size
dbfs:/mnt/coviddata/inputs/04-01-2020.csv,04-01-2020.csv,253568
dbfs:/mnt/coviddata/inputs/04-12-2020.csv,04-12-2020.csv,305660
dbfs:/mnt/coviddata/inputs/DoctorCountLatest.csv,DoctorCountLatest.csv,276136
dbfs:/mnt/coviddata/inputs/UID_ISO_FIPS_LookUp_Table.csv,UID_ISO_FIPS_LookUp_Table.csv,352008
dbfs:/mnt/coviddata/inputs/latestcovidcount.csv,latestcovidcount.csv,311180


#Wrangle the Doctor Data

In [19]:
filepath2="/mnt/coviddata/inputs/DoctorCountLatest.csv"


###Infer the schema and load the data to a spark data frame. 
### Cache the data for faster operations

In [21]:
doctorraw = spark.read.format('csv').options(header='false', inferSchema='true').load(filepath2)
doctorraw.cache()

### Check if schema was inferred correctly

In [23]:
doctorraw.printSchema()

### Display the data in a nice readable format

In [25]:
display(doctorraw)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18,_c19
22092804,HWF_0001,COUNTRY,AGO,YEAR,1997,,,,,,,,,0.592,0.592,,,,2020-02-12 09:20:25.0000000 +00:00
22092805,HWF_0001,COUNTRY,AGO,YEAR,2004,,,,,,,,,0.621,0.621,,,,2020-02-12 09:20:25.0000000 +00:00
22092806,HWF_0001,COUNTRY,AGO,YEAR,2009,,,,,,,,,1.313,1.313,,,,2020-02-12 09:20:25.0000000 +00:00
22092807,HWF_0001,COUNTRY,AGO,YEAR,2017,,,,,,,,,2.146,2.146,,,,2020-02-12 09:20:25.0000000 +00:00
22092808,HWF_0001,COUNTRY,BDI,YEAR,2004,,,,,,,,,0.28,0.28,,,,2020-02-12 09:20:25.0000000 +00:00
22092809,HWF_0001,COUNTRY,BDI,YEAR,2010,,,,,,,,,0.482,0.482,,,,2020-02-12 09:20:25.0000000 +00:00
22092810,HWF_0001,COUNTRY,BDI,YEAR,2011,,,,,,,,,0.399,0.399,,,,2020-02-12 09:20:25.0000000 +00:00
22092811,HWF_0001,COUNTRY,BDI,YEAR,2012,,,,,,,,,0.539,0.539,,,,2020-02-12 09:20:25.0000000 +00:00
22092812,HWF_0001,COUNTRY,BDI,YEAR,2013,,,,,,,,,0.581,0.581,,,,2020-02-12 09:20:25.0000000 +00:00
22092813,HWF_0001,COUNTRY,BDI,YEAR,2014,,,,,,,,,0.619,0.619,,,,2020-02-12 09:20:25.0000000 +00:00


### focus on the columns we want to work with

In [27]:
display(doctorraw.select("_c3","_c5","_c15"))

_c3,_c5,_c15
AGO,1997,0.592
AGO,2004,0.621
AGO,2009,1.313
AGO,2017,2.146
BDI,2004,0.28
BDI,2010,0.482
BDI,2011,0.399
BDI,2012,0.539
BDI,2013,0.581
BDI,2014,0.619


In [28]:
doctorraw.select("_c3","_c5","_c15").show()

### filter the data so that there is only the doctor per 10k count for the most recent year for each country in the list. We only need the latest year.

In [30]:
doctorlatest=doctorraw.groupBy("_c3").max("_c5","_c15")

### just confirm that Australia exists in the data set we're pulling

In [32]:
doctorlatest.filter("_c3= 'AUS'").show()

In [33]:
display(doctorlatest)

_c3,max(_c5),max(_c15)
NIU,2008,23.529
HTI,2018,2.384
BRB,2017,24.843
LVA,2017,46.934
POL,2017,24.188
ZMB,2018,11.867
JAM,2017,13.061
BRA,2018,21.652
ARM,2017,44.023
MOZ,2018,0.838


### rename the columns

In [35]:
doctorlatest=doctorlatest.withColumnRenamed("_c3",'COUNTRY').withColumnRenamed("max(_c5)",'YEAR').withColumnRenamed("max(_c15)",'DoctorsPer10k')

In [36]:
doctorlatest.printSchema()

In [37]:
doctorlatest.printSchema()

### save it to csv on our data lake

In [39]:
doctorlatest.write.mode('overwrite').option("header","true").csv('/mnt/coviddata/outputs/DoctorCountLatestYear')

###Load the country code data

In [41]:
filepath3="/mnt/coviddata/inputs/UID_ISO_FIPS_LookUp_Table.csv"

In [42]:
countrycodes = spark.read.format('csv').options(header='true', inferSchema='true').load(filepath3)

In [43]:
countrycodes.printSchema()

In [44]:
countrycodes.show()

### just want country region and iso3

In [46]:
countrycodeiso3=countrycodes.select("iso3","Country_Region").distinct()

In [47]:
countrycodeiso3.filter("iso3='AUS'").show()

In [48]:
countrycodeiso3.write.mode('overwrite').csv('/mnt/coviddata/outputs/CountryCodesISO3')

#Wrangle the COVID Data

### load the covid data and summarize by country
### join the summarized data with the count of doctors and country codes

In [50]:
# Creating widgets for leveraging parameters, and printing the parameters

dbutils.widgets.text("input", "","")
y = dbutils.widgets.get("input")
print ("Param -\'input':")
print (y)

dbutils.widgets.text("fileDate", "","")
z = dbutils.widgets.get("fileDate")
print ("Param -\'fileDate':")
print (z)

dbutils.widgets.text("name", "","")
u = dbutils.widgets.get("name")
print ("Param -\'name':")
print (u)

dbutils.widgets.text("name2", "","")
v = dbutils.widgets.get("name2")
print ("Param -\'name2':")
print (v)

In [51]:
filepath = "/mnt/coviddata/inputs/latestcovidcount.csv"

### infer the schema and load the data into a spark dataframe

In [53]:
covidraw = spark.read.format('csv').options(header='true', inferSchema='true').load(filepath)


In [54]:
display(covidraw)

FIPS,Admin2,Province_State,Country_Region,Last_Update,Lat,Long_,Confirmed,Deaths,Recovered,Active,Combined_Key
45001.0,Abbeville,South Carolina,US,2020-04-14T23:33:31.000+0000,34.22333378,-82.46170658,9,0,0,9,"Abbeville, South Carolina, US"
22001.0,Acadia,Louisiana,US,2020-04-14T23:33:31.000+0000,30.2950649,-92.41419698,104,5,0,99,"Acadia, Louisiana, US"
51001.0,Accomack,Virginia,US,2020-04-14T23:33:31.000+0000,37.76707161,-75.63234615,15,0,0,15,"Accomack, Virginia, US"
16001.0,Ada,Idaho,US,2020-04-14T23:33:31.000+0000,43.4526575,-116.24155159999998,538,9,0,529,"Ada, Idaho, US"
19001.0,Adair,Iowa,US,2020-04-14T23:33:31.000+0000,41.33075609,-94.47105874,1,0,0,1,"Adair, Iowa, US"
21001.0,Adair,Kentucky,US,2020-04-14T23:33:31.000+0000,37.10459774,-85.28129668,43,1,0,42,"Adair, Kentucky, US"
29001.0,Adair,Missouri,US,2020-04-14T23:33:31.000+0000,40.19058551,-92.60078167,12,0,0,12,"Adair, Missouri, US"
40001.0,Adair,Oklahoma,US,2020-04-14T23:33:31.000+0000,35.88494195,-94.65859267,28,2,0,26,"Adair, Oklahoma, US"
8001.0,Adams,Colorado,US,2020-04-14T23:33:31.000+0000,39.87432092,-104.3362578,726,27,0,699,"Adams, Colorado, US"
16003.0,Adams,Idaho,US,2020-04-14T23:33:31.000+0000,44.89333571,-116.4545247,1,0,0,1,"Adams, Idaho, US"


In [55]:
covidlatest=covidraw.select("Country_Region","Confirmed","Deaths","Recovered").groupby("Country_Region").sum("Confirmed","Deaths","Recovered")

In [56]:
display(covidlatest)

Country_Region,sum(Confirmed),sum(Deaths),sum(Recovered)
Chad,23,0,2
Paraguay,159,7,22
Russia,21102,170,1694
Yemen,1,0,0
Senegal,299,2,183
Cabo Verde,11,1,1
Sweden,11445,1033,381
Guyana,47,6,8
Burma,63,4,2
Eritrea,34,0,0


In [57]:
#covidlatest.write.mode('overwrite').csv('/mnt/coviddata/outputs/CovidLatest')

In [58]:
doctorlatest.show()

In [59]:
countrycodeiso3.show()

In [60]:
from pyspark.sql.functions import col
doctoriso3=doctorlatest.join(countrycodeiso3,col("COUNTRY")==col("iso3"))

In [61]:
doctoriso3.select("COUNTRY","YEAR","DoctorsPer10k","Country_Region").filter("COUNTRY = 'AUS'").show()

In [62]:
coviddoctors = covidlatest.join(doctoriso3, doctoriso3.Country_Region == covidlatest.Country_Region)

In [63]:
coviddoctors.show()

In [64]:
coviddoctorselect = coviddoctors.select(covidlatest.Country_Region,"sum(Confirmed)","sum(Deaths)","sum(Recovered)","COUNTRY","YEAR","DoctorsPer10k")

In [65]:
coviddoctorfinal=coviddoctorselect\
.withColumnRenamed('sum(Confirmed)','Confirmed')\
.withColumnRenamed('sum(Deaths)','Deaths')\
.withColumnRenamed('sum(Recovered)','Recovered')\
.withColumnRenamed('COUNTRY','Iso3')\
.withColumnRenamed('YEAR','YearOfDoctorCount')

In [66]:
coviddoctorfinal.printSchema()

In [67]:
coviddoctorfinal.write.mode("overwrite").option("header", "true").csv("/mnt/coviddata/outputs/CovidDoctorCombined")

In [68]:
#dbutils.fs.mkdirs("/mnt/coviddata/outputs/final")

### for the data factory to correctly copy these shards to Synapse, we need to remove any unneeded files from the output directory. Since trying to find files that begin with "_" seem to throw java, we will look for all the .csv files and move them to a clean directory and then point the factory to that as the source

In [70]:
%scala

val fileprefix= "/mnt/coviddata/outputs/final/"
val partition_path = dbutils.fs.ls("/mnt/coviddata/outputs/CovidDoctorCombined")
     .filter(file=>file.name.endsWith("csv"))//(0).path

partition_path.foreach { file => dbutils.fs.cp(file.path,fileprefix+file.name)}

//partition_path.show()

//partition_path.toDF().foreach { file => dbutils.fs.cp(file(0).toString,)}//.toString, true)}


//dbutils.fs.cp(partition_path,fileprefix+".tab")

//dbutils.fs.rm(fileprefix+".tmp",recurse=true)

In [71]:
display(dbutils.fs.ls('/mnt/coviddata/outputs/final/'))

path,name,size
dbfs:/mnt/coviddata/outputs/final/part-00000-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2522-1-c000.csv,part-00000-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2522-1-c000.csv,119
dbfs:/mnt/coviddata/outputs/final/part-00002-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2524-1-c000.csv,part-00002-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2524-1-c000.csv,108
dbfs:/mnt/coviddata/outputs/final/part-00003-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2525-1-c000.csv,part-00003-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2525-1-c000.csv,107
dbfs:/mnt/coviddata/outputs/final/part-00004-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2526-1-c000.csv,part-00004-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2526-1-c000.csv,118
dbfs:/mnt/coviddata/outputs/final/part-00009-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2531-1-c000.csv,part-00009-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2531-1-c000.csv,179
dbfs:/mnt/coviddata/outputs/final/part-00010-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2532-1-c000.csv,part-00010-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2532-1-c000.csv,116
dbfs:/mnt/coviddata/outputs/final/part-00011-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2533-1-c000.csv,part-00011-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2533-1-c000.csv,164
dbfs:/mnt/coviddata/outputs/final/part-00012-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2534-1-c000.csv,part-00012-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2534-1-c000.csv,109
dbfs:/mnt/coviddata/outputs/final/part-00016-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2538-1-c000.csv,part-00016-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2538-1-c000.csv,117
dbfs:/mnt/coviddata/outputs/final/part-00017-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2539-1-c000.csv,part-00017-tid-3994025247378675150-6fe86fb5-a136-4da8-a6a3-58febb661978-2539-1-c000.csv,106
