In [1]:
# Setup Spark SQL
# Note if running locally you need the JVM https://www.oracle.com/java/technologies/downloads/
# Consider running in https://colab.research.google.com/
%pip install pyspark



In [2]:
# Initialize Context - this is where you'd setup information about your Hadoop cluster if you had one!
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Covid").getOrCreate()

sc = spark.sparkContext

sc.setLogLevel("WARN")

In [3]:
# Download 100mb covid county data file
!curl "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv" > ./uscounties.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 99.9M  100 99.9M    0     0  79.8M      0  0:00:01  0:00:01 --:--:-- 79.8M


In [4]:
# Read the file into a Spark DataFrame
usCountiesFilePath = "./uscounties.csv"

df = spark.read.csv(usCountiesFilePath, inferSchema=True, header=True)

df.show()

+----------+-----------+----------+-----+-----+------+
|      date|     county|     state| fips|cases|deaths|
+----------+-----------+----------+-----+-----+------+
|2020-01-21|  Snohomish|Washington|53061|    1|     0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|
|2020-01-25|     Orange|California| 6059|    1|     0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|
|2020-01-25|  Snohomish|Washington|53061|    1|     0|
|2020-01-26|   Maricopa|   Arizona| 4013|    1|     0|
|2020-01-26|Los Angeles|California| 6037|    1|     0|
|2020-01-26|     Orange|California| 6059|    1|     0|
|2020-01-26|       Cook|  Illinois|17031|    1|     0|
|2020-01-26|  Snohomish|Washington|53061|    1|     0|
|2020-01-27|   Maricopa|   Arizona| 4013|    1|     0|
|2020-01-27|Los Angeles|California| 6037|    1|     0|
|2020-01-2

In [10]:
# SparkSQL API
df.createOrReplaceTempView("covid")  # create table that you can do sql on

print("Max deaths:")
spark.sql(
    """
    select county, state, deaths, date
    from covid
    order by deaths desc
    limit 1
  """
).show()

Max deaths:
+-------------+--------+------+----------+
|       county|   state|deaths|      date|
+-------------+--------+------+----------+
|New York City|New York| 40267|2022-05-13|
+-------------+--------+------+----------+



In [6]:
# DataFrame style
from pyspark.sql.functions import col

print("Max deaths:")
print(
    df.orderBy(col("deaths").desc()).take(  # .where(col("county") == "New York City") \
        1
    )
)

Max deaths:
[Row(date=datetime.date(2022, 5, 13), county='New York City', state='New York', fips=None, cases=2422658, deaths=40267)]


In [7]:
# RDD MapReduce Style without key
rows = df.rdd


def getMax(cumm, other):
    if other["deaths"] is not None and other["deaths"] > cumm["deaths"]:
        return other
    else:
        return cumm


print("Max deaths:")
print(rows.reduce(getMax))

Max deaths:
Row(date=datetime.date(2022, 5, 13), county='New York City', state='New York', fips=None, cases=2422658, deaths=40267)


In [8]:
# RDD MapReduce Style with mapped tuples
rows = df.rdd


def getMax(cumm, other):
    if other[0] > cumm[0]:
        return other
    else:
        return cumm


rows = rows.map(lambda r: (r["deaths"] or 0, f"{r['county']},{r['state']}"))
print("Max deaths:")
print(rows.reduce(getMax))

Max deaths:
(40267, 'New York City,New York')


In [11]:
# Write code to find the county with the most deaths
# SparkSQL API
df.createOrReplaceTempView("covid")  # create table that you can do sql on

print("Max deaths:")
spark.sql(
    """
    select county, state, deaths, date
    from covid
    order by deaths desc
    limit 1
  """
).show()

Max deaths:
+-------------+--------+------+----------+
|       county|   state|deaths|      date|
+-------------+--------+------+----------+
|New York City|New York| 40267|2022-05-13|
+-------------+--------+------+----------+



In [13]:
# Write code to find the county with the most cases
df.createOrReplaceTempView("covid")

print("Max cases:")
spark.sql(
    """
    select county, state, cases, date
    from covid
    order by cases desc
    limit 1
    """
).show()

Max cases:
+-----------+----------+-------+----------+
|     county|     state|  cases|      date|
+-----------+----------+-------+----------+
|Los Angeles|California|2908425|2022-05-13|
+-----------+----------+-------+----------+



In [25]:
# Write code to find the total number of deaths in Utah county
df.createOrReplaceTempView("covid")

print("Max cases:")
spark.sql(
    """
    select * from
    ( select state, county, deaths as total_number_of_deaths, date
    from covid
    where county = "Utah"
    ) as deaths
    order by date desc
    limit 1
    """
).show()

Max cases:
+-----+------+----------------------+----------+
|state|county|total_number_of_deaths|      date|
+-----+------+----------------------+----------+
| Utah|  Utah|                   791|2022-05-13|
+-----+------+----------------------+----------+



In [59]:
# Write code to find the death rate for each state and sort the states by death rate descending

df.createOrReplaceTempView("covid")

print("death rate:")
spark.sql(
    """
    select state, ROUND(((sum(deaths)/sum(cases))*100),2) as death_rate_percent from(
    select state, county, Last(deaths) as deaths, Last(cases) as cases
    from covid
    Group by state, county) as deathTocases
    Group by state
    Order by death_rate_percent desc
    """
).show(100)


death rate:
+--------------------+------------------+
|               state|death_rate_percent|
+--------------------+------------------+
|        Pennsylvania|              1.57|
|         Mississippi|              1.55|
|             Alabama|               1.5|
|              Nevada|              1.49|
|             Georgia|              1.49|
|             Arizona|              1.49|
|            Michigan|              1.46|
|          New Mexico|              1.45|
|          New Jersey|              1.45|
|                Ohio|              1.42|
|            Missouri|              1.41|
|         Connecticut|               1.4|
|           Louisiana|              1.39|
|            Oklahoma|              1.38|
|            Maryland|              1.38|
|             Indiana|              1.38|
|            Arkansas|              1.36|
|       West Virginia|              1.36|
|           Tennessee|              1.31|
|            New York|              1.31|
|               Texas|

In [72]:
# Write code to something else interesting with this data – your choice

df.createOrReplaceTempView("covid")

print("highest deaths per county in WA:")
spark.sql(
    """
    select county, Last(deaths) as deaths
    from covid
    Where State = "Washington"
    Group by county
    Order by deaths desc
    """
).show(20)

highest deaths per county in WA:
+------------+------+
|      county|deaths|
+------------+------+
|        King|  2774|
|      Pierce|  1409|
|     Spokane|  1378|
|   Snohomish|  1145|
|       Clark|   804|
|      Yakima|   790|
|      Benton|   472|
|    Thurston|   417|
|     Cowlitz|   347|
|      Kitsap|   344|
|     Whatcom|   301|
|       Grant|   259|
|       Lewis|   254|
|    Franklin|   214|
|      Skagit|   204|
|Grays Harbor|   198|
|      Chelan|   162|
|     Stevens|   147|
| Walla Walla|   142|
|     Clallam|   141|
+------------+------+
only showing top 20 rows



In [9]:
# Extra Credit 1 - Plot your death rate data!
# Extra Credit 2 - Join this with other data or find something intresting in this data and plot it on a map!

import pandas as pd
import plotly.express as px

data = pd.DataFrame({
  'state': ['NY', 'CA', 'TX', 'FL'],
  'values': [10, 20, 15, 25]
})

fig = px.choropleth(
    data,
    locations='state', # Column with state abbreviations
    locationmode='USA-states', # Set location mode to US states
    color='values', # Column to determine color intensity
    scope='usa', # Limit map to the USA
    color_continuous_scale='Viridis', # Choose a color scale
    title='Extra Credit Plot <Insert name here>'
)

fig.show()
