In [1]:
# Setup Spark SQL
# Note if running locally you need the JVM https://www.oracle.com/java/technologies/downloads/
# Also, if running locally you'll need to allow it to talk over the network to your own machine
# Consider running in https://colab.research.google.com/
%pip install pyspark



In [2]:
# Initialize Context - this is where you'd setup information about your Hadoop cluster if you had one!
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Covid").getOrCreate()

sc = spark.sparkContext

sc.setLogLevel("WARN")

In [3]:
# Download 100mb covid county data file
!curl "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv" > ./uscounties.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 99.9M  100 99.9M    0     0   223M      0 --:--:-- --:--:-- --:--:--  223M


In [8]:
# Write code to define or infer the schema and then read in the dataset
# Read the file into a Spark DataFrame
usCountiesFilePath = "./uscounties.csv"
from pyspark.sql.types import DateType, StringType, StructField, StructType, IntegerType

# the fips column needs to be a string in order to preserve leading zeros
schema = StructType([StructField('date', DateType(), True), StructField('county', StringType(), True), StructField('state', StringType(), True), StructField('fips', StringType(), True), StructField('cases', IntegerType(), True), StructField('deaths', IntegerType(), True)])

df = spark.read.csv(usCountiesFilePath, schema=schema, header=True)
df.show()

+----------+-----------+----------+-----+-----+------+
|      date|     county|     state| fips|cases|deaths|
+----------+-----------+----------+-----+-----+------+
|2020-01-21|  Snohomish|Washington|53061|    1|     0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|
|2020-01-25|     Orange|California|06059|    1|     0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|
|2020-01-25|  Snohomish|Washington|53061|    1|     0|
|2020-01-26|   Maricopa|   Arizona|04013|    1|     0|
|2020-01-26|Los Angeles|California|06037|    1|     0|
|2020-01-26|     Orange|California|06059|    1|     0|
|2020-01-26|       Cook|  Illinois|17031|    1|     0|
|2020-01-26|  Snohomish|Washington|53061|    1|     0|
|2020-01-27|   Maricopa|   Arizona|04013|    1|     0|
|2020-01-27|Los Angeles|California|06037|    1|     0|
|2020-01-2

In [9]:
# Write code to find the county with the most deaths
# Option 1) SparkSQL API
df.createOrReplaceTempView("covid")  # create table that you can do sql on

print("Max deaths:")
spark.sql(
    """
    select county, state, deaths
    from covid
    order by deaths desc
    limit 1
  """
).show()

Max deaths:
+-------------+--------+------+
|       county|   state|deaths|
+-------------+--------+------+
|New York City|New York| 40267|
+-------------+--------+------+



In [None]:
# Option 2) # DataFrame style
from pyspark.sql.functions import col

print("Max deaths:")
print(
    df.orderBy(col("deaths").desc()).take(  # .where(col("county") == "New York City") \
        1
    )
)

In [None]:
# # Option 3) RDD MapReduce Style without key
rows = df.rdd


def getMax(cumm, other):
    if other["deaths"] is not None and other["deaths"] > cumm["deaths"]:
        return other
    else:
        return cumm


print("Max deaths:")
print(rows.reduce(getMax))

In [None]:
# # Option 4) RDD MapReduce Style with mapped tuples
rows = df.rdd


def getMax(cumm, other):
    if other[0] > cumm[0]:
        return other
    else:
        return cumm


rows = rows.map(lambda r: (r["deaths"] or 0, f"{r['county']},{r['state']}"))
print("Max deaths:")
print(rows.reduce(getMax))

In [None]:
# Write code to find the county with the most deaths
print("Max deaths by county:")

spark.sql(
    """

    """
)

In [18]:
# Write code to find the county with the most cases
print("Most cases by county:")

spark.sql(
    """
    select county, cases
    from covid
    order by cases desc
    limit 1
    """
).show()

Most cases by county:
+-----------+-------+
|     county|  cases|
+-----------+-------+
|Los Angeles|2908425|
+-----------+-------+



In [23]:
# Write code to find the total number of deaths in Utah county
print("Total number of deaths in Utah County:")

spark.sql(
    """
    select deaths
    from covid
    where county = "Utah" and state = "Utah"
    order by date desc
    limit 1
    """
).show()

Total number of deaths in Utah County:
+------+
|deaths|
+------+
|   791|
+------+



In [None]:
# Write code to find the death rate for each state and sort the states by death rate descending
spark.sql(
    """
    with latestDateByFips as (
      select fips, max(date) as date
      from covid
      group by fips
    ), latestRowForEachFips as (
      select county, state, cases, deaths, c.fips
      from covid c
      join latestDateByFips l on c.date = l.date
    )

    select state, (sum(deaths) sum(cases))
    from latestRowForEachFips
    group by state
    """
).show()

In [None]:
# Write code to something else interesting with this data – your choice


In [None]:
# Extra Credit 1 - Plot your death rate data!
# Extra Credit 2 - Join this with other data or find something intresting in this data and plot it on a map!

In [None]:
# This example uses two-letter state code

import pandas as pd
import plotly.express as px

data = pd.DataFrame({
  'state': ['NY', 'CA', 'TX', 'FL'],
  'values': [10, 20, 15, 25]
})

fig = px.choropleth(
    data,
    locations='state', # Column with state abbreviations
    locationmode='USA-states', # Set location mode to US states
    color='values', # Column to determine color intensity
    scope='usa', # Limit map to the USA
    color_continuous_scale='Viridis', # Choose a color scale
    title='Extra Credit Plot <Insert name here>'
)

fig.show()


In [None]:
# This example uses the FIPS code to map data to a county

import pandas as pd
import plotly.express as px

# Example county-level data (FIPS codes are required for county-level plots)
data = pd.DataFrame({
    'fips': ['36061', '06037', '48201', '12086'],  # Example FIPS codes (NYC, LA, Houston, Miami-Dade)
    'values': [10, 20, 15, 25]
})

# Plot county-level choropleth map
fig = px.choropleth(
    data,
    geojson="https://raw.githubusercontent.com/plotly/datasets/master/geojson-counties-fips.json",  # GeoJSON for counties
    locations='fips',  # Use county FIPS codes
    color='values',  # Column to determine color intensity
    color_continuous_scale='Viridis',
    scope='usa',
    title='County-Level Extra Credit Plot'
)

fig.show()
