Documentation: https://spark.apache.org/docs/latest/api/python/index.html

### Explore the COVID data

In [0]:
# Check out pre-loaded dataset
display(dbutils.fs.ls('dbfs:/databricks-datasets/COVID/covid-19-data/'))

In [0]:
spark.read.text('dbfs:/databricks-datasets/COVID/covid-19-data/README.md').display()

Open `us-states.csv` and explore the schema

In [0]:
states = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/us-states.csv'))
states.display()

In [0]:
states.printSchema()

Explore the `us-counties.csv` and answer the following questions:
1. What's the time span of the data (firsta and last date)?
2. Agregate the table by state:
  - Which state has the most confirmed cases and confirmed deaths?
  - Make a plot.

In [0]:
counties = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/live/us-counties.csv'))
counties.display()

date,county,state,fips,cases,deaths,confirmed_cases,confirmed_deaths,probable_cases,probable_deaths
2021-03-12,Autauga,Alabama,1001.0,6409,95,5523.0,85.0,886.0,10.0
2021-03-12,Baldwin,Alabama,1003.0,20072,294,14228.0,220.0,5844.0,74.0
2021-03-12,Barbour,Alabama,1005.0,2175,52,1217.0,35.0,958.0,17.0
2021-03-12,Bibb,Alabama,1007.0,2475,58,2009.0,34.0,466.0,24.0
2021-03-12,Blount,Alabama,1009.0,6282,129,4835.0,109.0,1447.0,20.0
2021-03-12,Bullock,Alabama,1011.0,1183,39,1057.0,29.0,126.0,10.0
2021-03-12,Butler,Alabama,1013.0,2037,66,1858.0,60.0,179.0,6.0
2021-03-12,Calhoun,Alabama,1015.0,14034,299,10551.0,240.0,3483.0,59.0
2021-03-12,Chambers,Alabama,1017.0,3439,112,1711.0,72.0,1728.0,40.0
2021-03-12,Cherokee,Alabama,1019.0,1787,42,1152.0,32.0,635.0,10.0


In [0]:
counties.printSchema()

In [0]:
# Convert `date` from string to date type
# import the whole module pyspark.sql.functions as F and than call individual functions (F.function)
import pyspark.sql.functions as F

counties = counties.withColumn('date', (F.to_date(counties.date, 'yyyy-MM-dd')))
counties.display()

In [0]:
# Convert `date` from string to date type
# import individual functions from pyspark.sql.functions as you go
from pyspark.sql.functions import to_date

counties = counties.withColumn('date', (to_date(counties.date, 'yyyy-MM-dd')))
counties.display()

In [0]:
# First date
from pyspark.sql.functions import min

min_date = counties.select(min("date"))
min_date.display()

In [0]:
# Last date
from pyspark.sql.functions import max

max_date = counties.select(max("date"))
max_date.display()

In [0]:
# Aggregate confirmed cases and confirmed deaths per state
from pyspark.sql.functions import sum

df = (counties
       .groupby('state')
       .agg(sum('confirmed_cases').alias('confirmed_cases_total'), sum('confirmed_deaths').alias('confirmed_deaths_total'))
      )
df.show()

In [0]:
# Which state has the max confirmed cases ?
#df.orderBy('confirmed_cases_total', ascending=False).select('state').first()
df.orderBy('confirmed_cases_total', ascending=False).first()

In [0]:
# Which country has the max confirmed deaths ?
df.orderBy('confirmed_deaths_total', ascending=False).select('state').first()

In [0]:
# Do we have the data for all the states?
from pyspark.sql.functions import col

(df
.select("state")
.where(col("state").isNotNull())
.distinct()
# .count()
.display()
)

In [0]:
# How many counties is in each state ?
(counties
.select("county", "state")
.where(col("county").isNotNull())
.groupBy("state")
.count()
.orderBy("count", ascending=False)
.display()
)

Get familiar with the mask use study by reading the README.md

In [0]:
spark.read.text('dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/README.md').display()

In [0]:
masks = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/mask-use-by-county.csv'))
masks.display()

In [0]:
# Make two groups of frequency of wearing masks: almost_never (NEVER+RARELY) and almost_always (FREQUENTLY+ALWAYS): masks_groups
masks_groups = (masks
                 .withColumn('almost_never', masks.NEVER + masks.RARELY)
                 .withColumn('almost_always', masks.FREQUENTLY + masks.ALWAYS)
                 .drop('NEVER', 'RARELY', 'SOMETIMES', 'FREQUENTLY', 'ALWAYS')
                )
masks_groups.display()

Questions:
1. Join the tables `masks_groups` and `counties`.
2. Do you find a correlation between wearing a mask and number of cases/deaths?
3. Plot

In [0]:
# Join masks_groups and counties
mask_use = (counties
            .join(masks_groups, counties.fips == masks_groups.COUNTYFP)
            .drop('COUNTYFP'))
mask_use.display()

In [0]:
# What happened during the join? 
# It's a good practice to verify
print('counties:', counties.count(), ', masks_groups:', masks_groups.count(), ', mask_use:', mask_use.count())

In [0]:
# Keep data for only one state
masks_arkansas = (mask_use
                  .filter(mask_use.state == "Arkansas")
                 )
masks_arkansas.display()

In [0]:
# How would you visualize it? 
masks_arkansas_select = (masks_arkansas
                         .select('county', 'confirmed_cases', 'confirmed_deaths', 'almost_never', 'almost_always')
                        )
masks_arkansas_select.display()

In [0]:
masks_arkansas_select = (masks_arkansas
                         .select('county', 'confirmed_cases', 'confirmed_deaths', 'almost_never', 'almost_always')
                        )
masks_arkansas_select.display()

In [0]:
# Save as a Parquet file
mask_use.write.parquet("output/mask_use.parquet")

In [0]:
# Check where it is and how it looks like
display(dbutils.fs.ls('dbfs:/output/mask_use.parquet'))

In [0]:
# An example of partitioned dataset
display(dbutils.fs.ls('dbfs:/databricks-datasets/amazon/data20K'))

Re-do at least one excercise in SQL. (First you need to register dataframes as tables.)

In [0]:
# Create a temporary sql table
counties.createOrReplaceTempView("counties")
mask_use.createOrReplaceTempView("mask_use")
masks_groups.createOrReplaceTempView("masks_groups")

In [0]:
%sql

-- Verify that the table was created
SELECT *
FROM mask_use

In [0]:
%sql

-- Select data for only one state
SELECT *
FROM mask_use
WHERE state = "Arkansas"

In [0]:
%sql

-- Join mask_use and masks_groups 

SELECT county, state, confirmed_cases, confirmed_deaths, almost_never, almost_always
FROM counties
INNER JOIN masks_groups ON counties.fips=masks_groups.COUNTYFP