Documentation: https://spark.apache.org/docs/latest/api/python/index.html

### Explore the COVID data

In [3]:
# Check out pre-loaded dataset
# display(dbutils.fs.ls('dbfs:/'))
display(dbutils.fs.ls('dbfs:/databricks-datasets/COVID/covid-19-data/')) # size 0 in the displayed table means it is a folder

path,name,size
dbfs:/databricks-datasets/COVID/covid-19-data/.git/,.git/,0
dbfs:/databricks-datasets/COVID/covid-19-data/.github/,.github/,0
dbfs:/databricks-datasets/COVID/covid-19-data/.gitignore,.gitignore,10
dbfs:/databricks-datasets/COVID/covid-19-data/LICENSE,LICENSE,1289
dbfs:/databricks-datasets/COVID/covid-19-data/NEW-YORK-DEATHS-METHODOLOGY.md,NEW-YORK-DEATHS-METHODOLOGY.md,2771
dbfs:/databricks-datasets/COVID/covid-19-data/NYT-readme.md,NYT-readme.md,1748
dbfs:/databricks-datasets/COVID/covid-19-data/PROBABLE-CASES-NOTE.md,PROBABLE-CASES-NOTE.md,3162
dbfs:/databricks-datasets/COVID/covid-19-data/README.md,README.md,19391
dbfs:/databricks-datasets/COVID/covid-19-data/excess-deaths/,excess-deaths/,0
dbfs:/databricks-datasets/COVID/covid-19-data/live/,live/,0


In [4]:
spark.read.text('dbfs:/databricks-datasets/COVID/covid-19-data/README.md').display()

value
# Coronavirus (Covid-19) Data in the United States
**NEW:** We are publishing the data behind our [survey of mask usage](https://www.nytimes.com/interactive/2020/07/17/upshot/coronavirus-face-mask-map.html) in the United States in order to provide researchers a way to understand the role of mask wearing in the course of the pandemic. See the data and documentation in the [mask-use/](mask-use/) directory.
**NEW:** We are publishing the data behind our [excess deaths tracker](https://www.nytimes.com/interactive/2020/04/21/world/coronavirus-missing-deaths.html) in order to provide researchers and the public with a better record of the true toll of the pandemic. This data is compiled from official national and municipal data for 24 countries. See the data and documentation in the [excess-deaths/](excess-deaths/) directory.
---
[ [U.S. Data](us.csv) ([Raw CSV](https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv)) | [U.S. State-Level Data](us-states.csv) ([Raw CSV](https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-states.csv)) | [U.S. County-Level Data](us-counties.csv) ([Raw CSV](https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv)) ]
"The New York Times is releasing a series of data files with cumulative counts of coronavirus cases in the United States, at the state and county level, over time. We are compiling this time series data from state and local governments and health departments in an attempt to provide a complete record of the ongoing outbreak."
"Since late January, The Times has tracked cases of coronavirus in real time as they were identified after testing. Because of the widespread shortage of testing, however, the data is necessarily limited in the picture it presents of the outbreak."
"We have used this data to power our [maps](https://www.nytimes.com/interactive/2020/us/coronavirus-us-cases.html) and [reporting](https://www.nytimes.com/coronavirus) tracking the outbreak, and it is now being made available to the public in response to requests from researchers, scientists and government officials who would like access to the data to better understand the outbreak."
"The data begins with the first reported coronavirus case in Washington State on Jan. 21, 2020. We will publish regular updates to the data in this repository."
## Live and Historical Data


Open `us-states.csv` and explore the schema

In [6]:
states = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true") # we can define our own schema with correct types
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/us-states.csv'))
states.display()

date,state,fips,cases,deaths
2020-01-21,Washington,53,1,0
2020-01-22,Washington,53,1,0
2020-01-23,Washington,53,1,0
2020-01-24,Illinois,17,1,0
2020-01-24,Washington,53,1,0
2020-01-25,California,6,1,0
2020-01-25,Illinois,17,1,0
2020-01-25,Washington,53,1,0
2020-01-26,Arizona,4,1,0
2020-01-26,California,6,2,0


In [7]:
states.printSchema()

In [8]:
states.display()

date,state,fips,cases,deaths
2020-01-21,Washington,53,1,0
2020-01-22,Washington,53,1,0
2020-01-23,Washington,53,1,0
2020-01-24,Illinois,17,1,0
2020-01-24,Washington,53,1,0
2020-01-25,California,6,1,0
2020-01-25,Illinois,17,1,0
2020-01-25,Washington,53,1,0
2020-01-26,Arizona,4,1,0
2020-01-26,California,6,2,0


Explore the `us-counties.csv` and answer the following questions:
1. What's the time span of the data (first and last date)?
2. Agregate the table by state:
  - Which state has the most confirmed cases and confirmed deaths?
  - Make a plot.

In [10]:
counties = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/live/us-counties.csv'))
counties.display()

date,county,state,fips,cases,deaths,confirmed_cases,confirmed_deaths,probable_cases,probable_deaths
2020-09-28,Autauga,Alabama,1001.0,1785,25,1601.0,24.0,184.0,1.0
2020-09-28,Baldwin,Alabama,1003.0,5588,50,5086.0,46.0,502.0,4.0
2020-09-28,Barbour,Alabama,1005.0,886,7,668.0,7.0,218.0,0.0
2020-09-28,Bibb,Alabama,1007.0,657,10,623.0,6.0,34.0,4.0
2020-09-28,Blount,Alabama,1009.0,1618,15,1256.0,15.0,362.0,0.0
2020-09-28,Bullock,Alabama,1011.0,607,14,582.0,13.0,25.0,1.0
2020-09-28,Butler,Alabama,1013.0,914,39,878.0,38.0,36.0,1.0
2020-09-28,Calhoun,Alabama,1015.0,3548,44,3183.0,36.0,365.0,8.0
2020-09-28,Chambers,Alabama,1017.0,1172,42,898.0,40.0,274.0,2.0
2020-09-28,Cherokee,Alabama,1019.0,614,13,464.0,12.0,150.0,1.0


In [11]:
counties.printSchema()

In [12]:
# Convert `date` from string to date type
import pyspark.sql.functions as F

states = states.withColumn('date',F.to_date(states.date, 'yyyy-MM-dd'))


In [13]:
states.printSchema()

In [14]:
# First day in the dataset
first_date = states.select(F.min('date')) # This is another syntax instead of using withColumn
first_date.show()

In [15]:
# Last day in the dataset 
first_date = states.select(F.max('date')) # This is another syntax instead of using withColumn
first_date.show()


In [16]:
# Aggregate confirmed cases and confirmed deaths per state
df = (counties.groupby('state')
              .agg(F.sum('confirmed_cases').alias('confirmed_cases_total'), F.sum('confirmed_deaths').alias('confirmed_deaths_total')))
df.show()

In [17]:
# Which county has the max confirmed cases?
df.orderBy('confirmed_cases_total', ascending=False).first()

In [18]:
# Which county has the max confirmed deaths?
df.orderBy('confirmed_deaths_total', ascending=False).first()

In [19]:
# Do we have the data for all the counties?
(df
.select("state")
.where(F.col("state").isNotNull()) # Function col returns a Column based on the given column name.
.distinct()
# .count()
.display()
)

state
Utah
Hawaii
Minnesota
Ohio
Northern Mariana Islands
Arkansas
Oregon
Texas
North Dakota
Pennsylvania


In [20]:
# How many counties is in each state?
( # brackets in order use chain
counties.select("county", "state")
        .where(F.col("county").isNotNull())
        .groupBy("state")
        .count()
        .orderBy("count", ascending=False)
        .display()
)

state,count
Texas,252
Georgia,160
Virginia,133
Kentucky,120
Missouri,117
Kansas,105
Illinois,103
North Carolina,100
Iowa,100
Tennessee,96


Get familiar with the mask use study by reading the README.md

In [22]:
spark.read.text('dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/README.md').display()

value
# Mask-Wearing Survey Data
The New York Times is releasing estimates of [mask usage](https://www.nytimes.com/interactive/2020/07/17/upshot/coronavirus-face-mask-map.html) by county in the United States.
"This data comes from a large number of interviews conducted online by the global data and survey firm Dynata at the request of The New York Times. The firm asked a question about mask use to obtain 250,000 survey responses between July 2 and July 14, enough data to provide estimates more detailed than the state level. (Several states have imposed new mask requirements since the completion of these interviews.)"
"Specifically, each participant was asked: _How often do you wear a mask in public when you expect to be within six feet of another person?_"
"This survey was conducted a single time, and at this point we have no plans to update the data or conduct the survey again."
## Data
Data on the estimated prevalence of mask-wearing in counties in the United States can be found in the **[mask-use-by-county.csv](mask-use-by-county.csv)** file. ([Raw CSV](https://raw.githubusercontent.com/nytimes/covid-19-data/master/mask-use/mask-use-by-county.csv))
```
"COUNTYFP,NEVER,RARELY,SOMETIMES,FREQUENTLY,ALWAYS"
"01001,0.053,0.074,0.134,0.295,0.444"


In [23]:
# create dataframe masks by reading dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/mask-use-by-county.csv
mask_use = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/mask-use-by-county.csv'))
mask_use.display()

COUNTYFP,NEVER,RARELY,SOMETIMES,FREQUENTLY,ALWAYS
1001,0.053,0.074,0.134,0.295,0.444
1003,0.083,0.059,0.098,0.323,0.436
1005,0.067,0.121,0.12,0.201,0.491
1007,0.02,0.034,0.096,0.278,0.572
1009,0.053,0.114,0.18,0.194,0.459
1011,0.031,0.04,0.144,0.286,0.5
1013,0.102,0.053,0.257,0.137,0.451
1015,0.152,0.108,0.13,0.167,0.442
1017,0.117,0.037,0.15,0.136,0.56
1019,0.135,0.027,0.161,0.158,0.52


In [24]:
# Make two groups of frequency of wearing masks: almost_never (NEVER+RARELY) and almost_always (FREQUENTLY+ALWAYS): masks_groups
mask_groups = (mask_use.withColumn('almost_never', mask_use.NEVER + mask_use.RARELY)
                      .withColumn('almost_always', mask_use.FREQUENTLY + mask_use.ALWAYS)
                      .drop('NEVER', 'RARELY', 'FREQUENTLY', 'ALWAYS', 'SOMETIMES')
               )
mask_groups.display()

COUNTYFP,almost_never,almost_always
1001,0.127,0.739
1003,0.142,0.759
1005,0.188,0.692
1007,0.054,0.85
1009,0.167,0.653
1011,0.071,0.786
1013,0.155,0.5880000000000001
1015,0.26,0.609
1017,0.154,0.6960000000000001
1019,0.162,0.678


Questions:
1. Join the tables `masks` and `counties`.
2. Do you find a correlation between wearing a mask and number of cases/deaths?
3. Plot

In [26]:
# Join masks_groups and counties: mask_join
mask_join =  (counties.join(mask_groups, counties.fips == mask_groups.COUNTYFP)
                      .drop('COUNTYFP'))
mask_join.display()

date,county,state,fips,cases,deaths,confirmed_cases,confirmed_deaths,probable_cases,probable_deaths,almost_never,almost_always
2020-09-28,Autauga,Alabama,1001,1785,25,1601.0,24.0,184.0,1.0,0.127,0.739
2020-09-28,Baldwin,Alabama,1003,5588,50,5086.0,46.0,502.0,4.0,0.142,0.759
2020-09-28,Barbour,Alabama,1005,886,7,668.0,7.0,218.0,0.0,0.188,0.692
2020-09-28,Bibb,Alabama,1007,657,10,623.0,6.0,34.0,4.0,0.054,0.85
2020-09-28,Blount,Alabama,1009,1618,15,1256.0,15.0,362.0,0.0,0.167,0.653
2020-09-28,Bullock,Alabama,1011,607,14,582.0,13.0,25.0,1.0,0.071,0.786
2020-09-28,Butler,Alabama,1013,914,39,878.0,38.0,36.0,1.0,0.155,0.5880000000000001
2020-09-28,Calhoun,Alabama,1015,3548,44,3183.0,36.0,365.0,8.0,0.26,0.609
2020-09-28,Chambers,Alabama,1017,1172,42,898.0,40.0,274.0,2.0,0.154,0.6960000000000001
2020-09-28,Cherokee,Alabama,1019,614,13,464.0,12.0,150.0,1.0,0.162,0.678


In [27]:
# What happened during the join? 
# It's a good practice to verify (count lines for counties, mask_groups and mask_join)
print('counties:', counties.distinct().count(), 'mask_groups:', mask_groups.distinct().count(), 'mask_join:', mask_join.distinct().count())

In [28]:
# Keep data for only one state
mask_California = (mask_join
                   .filter(mask_join.state == 'California'))

mask_California.display()

date,county,state,fips,cases,deaths,confirmed_cases,confirmed_deaths,probable_cases,probable_deaths,almost_never,almost_always
2020-09-28,Alameda,California,6001,21261,406,21261,406,,,0.027,0.918
2020-09-28,Alpine,California,6003,2,0,2,0,,,0.11,0.802
2020-09-28,Amador,California,6005,290,16,290,16,,,0.0579999999999999,0.843
2020-09-28,Butte,California,6007,2833,44,2833,44,,,0.0579999999999999,0.83
2020-09-28,Calaveras,California,6009,317,14,317,14,,,0.064,0.8380000000000001
2020-09-28,Colusa,California,6011,532,6,532,6,,,0.0579999999999999,0.851
2020-09-28,Contra Costa,California,6013,16702,206,16702,206,,,0.034,0.927
2020-09-28,Del Norte,California,6015,139,1,139,1,,,0.145,0.7430000000000001
2020-09-28,El Dorado,California,6017,1130,4,1130,4,,,0.07,0.8580000000000001
2020-09-28,Fresno,California,6019,28441,382,28441,382,,,0.043,0.897


In [29]:
# How would you visualize it? 
mask_California_select = mask_California.select('county', 'confirmed_cases', 'confirmed_deaths', 'almost_never', 'almost_always')
mask_California_select.display()

county,confirmed_cases,confirmed_deaths,almost_never,almost_always
Alameda,21261,406,0.027,0.918
Alpine,2,0,0.11,0.802
Amador,290,16,0.0579999999999999,0.843
Butte,2833,44,0.0579999999999999,0.83
Calaveras,317,14,0.064,0.8380000000000001
Colusa,532,6,0.0579999999999999,0.851
Contra Costa,16702,206,0.034,0.927
Del Norte,139,1,0.145,0.7430000000000001
El Dorado,1130,4,0.07,0.8580000000000001
Fresno,28441,382,0.043,0.897


In [30]:
# Save mask_use as a Parquet file
mask_join.write.parquet("output/mask_join.parquet")

In [31]:
# Check where it is and how it looks like
display(dbutils.fs.ls('dbfs:/output/mask_join.parquet'))

path,name,size
dbfs:/output/mask_join.parquet/_SUCCESS,_SUCCESS,0
dbfs:/output/mask_join.parquet/_committed_6947267565314638171,_committed_6947267565314638171,124
dbfs:/output/mask_join.parquet/_started_6947267565314638171,_started_6947267565314638171,0
dbfs:/output/mask_join.parquet/part-00000-tid-6947267565314638171-9149e740-f359-4be0-a056-2749be1a301b-778-1-c000.snappy.parquet,part-00000-tid-6947267565314638171-9149e740-f359-4be0-a056-2749be1a301b-778-1-c000.snappy.parquet,82654


In [32]:
# An example of partitioned dataset
display(dbutils.fs.ls('dbfs:/databricks-datasets/amazon/data20K'))

path,name,size
dbfs:/databricks-datasets/amazon/data20K/_SUCCESS,_SUCCESS,0
dbfs:/databricks-datasets/amazon/data20K/_common_metadata,_common_metadata,324
dbfs:/databricks-datasets/amazon/data20K/_metadata,_metadata,16159
dbfs:/databricks-datasets/amazon/data20K/part-r-00000-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,part-r-00000-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,403964
dbfs:/databricks-datasets/amazon/data20K/part-r-00001-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,part-r-00001-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,388283
dbfs:/databricks-datasets/amazon/data20K/part-r-00002-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,part-r-00002-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,389132
dbfs:/databricks-datasets/amazon/data20K/part-r-00003-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,part-r-00003-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,380250
dbfs:/databricks-datasets/amazon/data20K/part-r-00004-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,part-r-00004-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,377018
dbfs:/databricks-datasets/amazon/data20K/part-r-00005-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,part-r-00005-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,364893
dbfs:/databricks-datasets/amazon/data20K/part-r-00006-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,part-r-00006-112e73de-1ab1-447b-b167-0919dd731adf.gz.parquet,395181


Re-do at least one excercise in SQL. (First you need to register the data as a table.)

In [34]:
# Create a temporary sql table
counties.createOrReplaceTempView("counties")
mask_join.createOrReplaceTempView("mask_join")
mask_groups.createOrReplaceTempView("mask_groups")

In [35]:
%sql

-- Verify that the table was created
SELECT *
FROM mask_join

date,county,state,fips,cases,deaths,confirmed_cases,confirmed_deaths,probable_cases,probable_deaths,almost_never,almost_always
2020-09-28,Autauga,Alabama,1001,1785,25,1601.0,24.0,184.0,1.0,0.127,0.739
2020-09-28,Baldwin,Alabama,1003,5588,50,5086.0,46.0,502.0,4.0,0.142,0.759
2020-09-28,Barbour,Alabama,1005,886,7,668.0,7.0,218.0,0.0,0.188,0.692
2020-09-28,Bibb,Alabama,1007,657,10,623.0,6.0,34.0,4.0,0.054,0.85
2020-09-28,Blount,Alabama,1009,1618,15,1256.0,15.0,362.0,0.0,0.167,0.653
2020-09-28,Bullock,Alabama,1011,607,14,582.0,13.0,25.0,1.0,0.071,0.786
2020-09-28,Butler,Alabama,1013,914,39,878.0,38.0,36.0,1.0,0.155,0.5880000000000001
2020-09-28,Calhoun,Alabama,1015,3548,44,3183.0,36.0,365.0,8.0,0.26,0.609
2020-09-28,Chambers,Alabama,1017,1172,42,898.0,40.0,274.0,2.0,0.154,0.6960000000000001
2020-09-28,Cherokee,Alabama,1019,614,13,464.0,12.0,150.0,1.0,0.162,0.678


In [36]:
%sql

-- Select data for only one state
SELECT *
FROM mask_join
WHERE state = "Arkansas"

date,county,state,fips,cases,deaths,confirmed_cases,confirmed_deaths,probable_cases,probable_deaths,almost_never,almost_always
2020-09-28,Arkansas,Arkansas,5001,413,8,389,7,24,1,0.092,0.773
2020-09-28,Ashley,Arkansas,5003,434,10,429,10,5,0,0.126,0.712
2020-09-28,Baxter,Arkansas,5005,351,2,314,2,37,0,0.254,0.638
2020-09-28,Benton,Arkansas,5007,6585,87,6404,70,181,17,0.115,0.803
2020-09-28,Boone,Arkansas,5009,726,12,685,10,41,2,0.274,0.605
2020-09-28,Bradley,Arkansas,5011,342,8,340,7,2,1,0.196,0.629
2020-09-28,Calhoun,Arkansas,5013,36,0,35,0,1,0,0.277,0.615
2020-09-28,Carroll,Arkansas,5015,672,10,665,10,7,0,0.179,0.706
2020-09-28,Chicot,Arkansas,5017,1006,22,998,21,8,1,0.178,0.628
2020-09-28,Clark,Arkansas,5019,346,4,332,4,14,0,0.1709999999999999,0.749


In [37]:
%sql

-- Join mask_use and masks_groups 

SELECT county, state, confirmed_cases, confirmed_deaths, almost_never, almost_always
FROM counties
INNER JOIN mask_groups ON counties.fips=mask_groups.COUNTYFP

county,state,confirmed_cases,confirmed_deaths,almost_never,almost_always
Autauga,Alabama,1601.0,24.0,0.127,0.739
Baldwin,Alabama,5086.0,46.0,0.142,0.759
Barbour,Alabama,668.0,7.0,0.188,0.692
Bibb,Alabama,623.0,6.0,0.054,0.85
Blount,Alabama,1256.0,15.0,0.167,0.653
Bullock,Alabama,582.0,13.0,0.071,0.786
Butler,Alabama,878.0,38.0,0.155,0.5880000000000001
Calhoun,Alabama,3183.0,36.0,0.26,0.609
Chambers,Alabama,898.0,40.0,0.154,0.6960000000000001
Cherokee,Alabama,464.0,12.0,0.162,0.678
