Documentation:

- https://spark.apache.org/docs/latest/api/python/index.html
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html

### Explore the COVID data

In [None]:
# Check out the pre-loaded dataset
display(dbutils.fs.ls('dbfs:/databricks-datasets/COVID/covid-19-data/'))

In [None]:
# Display and read README file
spark.read.text('dbfs:/databricks-datasets/COVID/covid-19-data/README.md').display()

Open `us-states.csv` and explore the schema

In [None]:
states = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/us-states.csv'))

# Display the dataframe
states.display()

In [None]:
# Make a plot
states.display()

In [None]:
# Print schema (in this case it was inferred on the read)
states.printSchema()

Explore the `us-counties.csv` and answer the following questions:
1. What's the time span of the data (first and last date)?
2. Agregate the table by state:
  - Which state has the most confirmed cases and confirmed deaths?
  - Make a plot.

In [None]:
# Read the us-counties.csv file and infer the schema
counties = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/live/us-counties.csv'))
counties.display()

In [None]:
# Check the schema. What type is the date column?
counties.printSchema()

In [None]:
# Useful function: describe(). 
# It calculates basic statistics for each column.
display(counties.describe())

In [None]:
# Useful function: len()
# Returns number of columns
len(counties.columns)

In [None]:
# Convert `date` from string to date type
# import the whole module pyspark.sql.functions as F and than call individual functions (F.function)
import pyspark.sql.functions as F

counties = counties.withColumn('date', (F.to_date(F.col('date'), 'yyyy-MM-dd')))
#counties.display()

In [None]:
# Convert `date` from string to date type
# import individual functions from pyspark.sql.functions as you go
from pyspark.sql.functions import to_date, col

counties = counties.withColumn('date', (to_date(col('date'), 'yyyy-MM-dd')))
counties.display()

In [None]:
# First day in the dataset (use col())
from pyspark.sql.functions import min

min_date = counties.select(min(col("date")))
min_date.display()

In [None]:
# First day in the dataset (without col())
min_date = counties.select(min("date"))
min_date.display()

In [None]:
# Last day in the dataset
from pyspark.sql.functions import max

max_date = counties.select(max("date"))
max_date.display()

In [None]:
# Aggregate confirmed cases and confirmed deaths per state
from pyspark.sql.functions import sum

df = (counties
       .groupby('state')
       .agg(sum('confirmed_cases'), sum('confirmed_deaths'))
       .withColumnRenamed('sum(confirmed_cases)', 'confirmed_cases_total')
       .withColumnRenamed('sum(confirmed_deaths)', 'confirmed_deaths_total')
      )

df.display()

In [None]:
# Aggregate confirmed cases and confirmed deaths per state - shorter alternative
from pyspark.sql.functions import sum

df = (counties
       .groupby('state')
       .agg(sum('confirmed_cases').alias('confirmed_cases_total'), sum('confirmed_deaths').alias('confirmed_deaths_total'))    
      )

df.display()

In [None]:
# Which state has the max confirmed cases ?
#df.orderBy('confirmed_cases_total', ascending=False).select('state').first()
df.orderBy('confirmed_cases_total', ascending=False).first()[0]
# display(df.orderBy('confirmed_cases_total', ascending=False))

In [None]:
# Which state has the max confirmed deaths ?
df.orderBy('confirmed_deaths_total', ascending=False).select('state').first()

In [None]:
# Do we have the data for all the states?
from pyspark.sql.functions import col

(df
  .select("state")
  .filter(col("confirmed_deaths_total").isNull())
  #.filter(col("state").isNotNull())
  #.distinct()
  #.count()
  .display()
)

In [None]:
# How many counties is in each state ?
(counties
  .select("county", "state")
  .where(col("county").isNotNull())
  .groupBy("state")
  .count()
  .orderBy("count", ascending=False)
  .display()
)

Get familiar with the mask use study by reading the README.md

In [None]:
# Read the README file
spark.read.text('dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/README.md').display()

In [None]:
# Create dataframe masks by reading dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/mask-use-by-county.csv
masks = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/mask-use-by-county.csv'))
masks.display()

In [None]:
# Make two groups of frequency of wearing masks: almost_never (NEVER+RARELY) and almost_always (FREQUENTLY+ALWAYS): masks_groups
masks_groups = (masks
                 .withColumn('almost_never', masks.NEVER + masks.RARELY)
                 .withColumn('almost_always', masks.FREQUENTLY + masks.ALWAYS)
                 .drop('NEVER', 'RARELY', 'SOMETIMES', 'FREQUENTLY', 'ALWAYS')
                )
masks_groups.display()

Questions:
1. Join the tables `masks_groups` and `counties`.
2. Do you find a correlation between wearing a mask and number of cases/deaths?
3. Plot

In [None]:
# Join masks_groups and counties
mask_use = (counties
            .join(masks_groups, counties.fips == masks_groups.COUNTYFP)
            #.drop('COUNTYFP')
           )
mask_use.display()

In [None]:
# What happened during the join? 
# It's a good practice to verify
print('counties:', counties.count(), ', masks_groups:', masks_groups.count(), ', mask_use:', mask_use.count())

In [None]:
# Keep data for only one state
masks_arkansas = (mask_use
                  .filter(mask_use.state == "Arkansas")
                 )
masks_arkansas.display()

In [None]:
# How would you visualize the frequency groups? 
masks_arkansas_select = (masks_arkansas
                         .select('county', 'confirmed_cases', 'confirmed_deaths', 'almost_never', 'almost_always')
                        )
masks_arkansas_select.display()

In [None]:
masks_arkansas_select = (masks_arkansas
                         .select('county', 'confirmed_cases', 'confirmed_deaths', 'almost_never', 'almost_always')
                        )
masks_arkansas_select.display()

In [None]:
# Save as a Parquet file
mask_use.write.parquet("output/01-02_mask_use.parquet")

In [None]:
# On how many partitions is this file partitioned: dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/mask-use-by-county.csv?
masks = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/mask-use-by-county.csv'))
masks.rdd.getNumPartitions()

In [None]:
# An example of partitioned dataset
display(dbutils.fs.ls('dbfs:/databricks-datasets/amazon/data20K'))

In [None]:
# Get the number of partitions of amazon dataset
amazon = (spark.read.format('parquet')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/amazon/data20K'))
amazon.rdd.getNumPartitions()

Re-do at least one excercise in SQL. (First you need to register dataframes as tables.)

In [None]:
# Create a temporary sql table
counties.createOrReplaceTempView("counties")
mask_use.createOrReplaceTempView("mask_use")
masks_groups.createOrReplaceTempView("masks_groups")

In [None]:
%sql

-- Verify that the table was created
SELECT *
FROM mask_use

In [None]:
%sql

-- Select data for only one state
SELECT *
FROM mask_use
WHERE state = "Arkansas"

In [None]:
%sql

-- Join mask_use and masks_groups 

SELECT county, state, confirmed_cases, confirmed_deaths, almost_never, almost_always
FROM counties
INNER JOIN masks_groups ON counties.fips=masks_groups.COUNTYFP