In [1]:
# Setup Spark SQL
# Note if running locally you need the JVM https://www.oracle.com/java/technologies/downloads/
# Consider running in https://colab.research.google.com/
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=3db24c917287e213703ab6528bd64dd822954509850e2adb93fd3c0a64bc0a05
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
# Initialize Context - this is where you'd setup information about your Hadoop cluster if you had one!
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Covid").getOrCreate()

sc = spark.sparkContext

sc.setLogLevel("WARN")

In [3]:
# Download 100mb covid county data file
!curl "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv" > ./uscounties.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 99.9M  100 99.9M    0     0   154M      0 --:--:-- --:--:-- --:--:--  154M


In [4]:
# Read the file into a Spark DataFrame
usCountiesFilePath = "./uscounties.csv"

df = spark.read.csv(usCountiesFilePath, inferSchema=True, header=True)

df.show()

+----------+-----------+----------+-----+-----+------+
|      date|     county|     state| fips|cases|deaths|
+----------+-----------+----------+-----+-----+------+
|2020-01-21|  Snohomish|Washington|53061|    1|     0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|
|2020-01-25|     Orange|California| 6059|    1|     0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|
|2020-01-25|  Snohomish|Washington|53061|    1|     0|
|2020-01-26|   Maricopa|   Arizona| 4013|    1|     0|
|2020-01-26|Los Angeles|California| 6037|    1|     0|
|2020-01-26|     Orange|California| 6059|    1|     0|
|2020-01-26|       Cook|  Illinois|17031|    1|     0|
|2020-01-26|  Snohomish|Washington|53061|    1|     0|
|2020-01-27|   Maricopa|   Arizona| 4013|    1|     0|
|2020-01-27|Los Angeles|California| 6037|    1|     0|
|2020-01-2

In [5]:
# SparkSQL API
df.createOrReplaceTempView("covid")  # create table that you can do sql on

print("Max deaths:")
spark.sql(
    """
    select county, state, deaths
    from covid
    order by deaths desc
    limit 1
  """
).show()

Max deaths:
+-------------+--------+------+
|       county|   state|deaths|
+-------------+--------+------+
|New York City|New York| 40267|
+-------------+--------+------+



In [6]:
# DataFrame style
from pyspark.sql.functions import col

print("Max deaths:")
print(
    df.orderBy(col("deaths").desc()).take(  # .where(col("county") == "New York City") \
        1
    )
)

Max deaths:
[Row(date=datetime.date(2022, 5, 13), county='New York City', state='New York', fips=None, cases=2422658, deaths=40267)]


In [7]:
# RDD MapReduce Style without key
rows = df.rdd


def getMax(cumm, other):
    if other["deaths"] is not None and other["deaths"] > cumm["deaths"]:
        return other
    else:
        return cumm


print("Max deaths:")
print(rows.reduce(getMax))

Max deaths:
Row(date=datetime.date(2022, 5, 13), county='New York City', state='New York', fips=None, cases=2422658, deaths=40267)


In [8]:
# RDD MapReduce Style with mapped tuples
rows = df.rdd


def getMax(cumm, other):
    if other[0] > cumm[0]:
        return other
    else:
        return cumm


rows = rows.map(lambda r: (r["deaths"] or 0, f"{r['county']},{r['state']}"))
print("Max deaths:")
print(rows.reduce(getMax))

Max deaths:
(40267, 'New York City,New York')


In [9]:
# Write code to find the county with the most deaths
print("Max Death County")
spark.sql(
    """
    select county, state, deaths
    from covid
    order by deaths desc
    limit 1
    """
).show()

+-------------+--------+------+
|       county|   state|deaths|
+-------------+--------+------+
|New York City|New York| 40267|
+-------------+--------+------+



In [14]:
# Write code to find the county with the most cases
print("Max Cases County")
spark.sql(
    """
    select county, state, cases
    from covid
    order by cases desc
    limit 1
    """
).show()

Max Cases County
+-----------+----------+-------+
|     county|     state|  cases|
+-----------+----------+-------+
|Los Angeles|California|2908425|
+-----------+----------+-------+



In [16]:
# Write code to find the total number of deaths in Utah county
print("Utah County Deaths")
spark.sql(
    """
    select county, state, deaths
    from covid
    where state = 'Utah' AND county = 'Utah'
    order by deaths desc
    limit 1
    """
).show()

Utah County Deaths
+------+-----+------+
|county|state|deaths|
+------+-----+------+
|  Utah| Utah|   791|
+------+-----+------+



In [24]:
# Write code to find the death rate for each state and sort the states by death rate descending
print("State Death Rates")
spark.sql(
    """
    with casesDeathsCounties (
      select max(cases) as cases, max(deaths) as deaths, county, state
      from covid
      group by county, state
    ),
    totals (
      select sum(deaths) as totalDeaths, sum(cases) as totalCases, state
      from casesDeathsCounties
      group by state
    )

    select totalDeaths/totalCases as DeathRate, state
    from totals
    order by DeathRate desc

    """
).show()

State Death Rates
+--------------------+-------------+
|           DeathRate|        state|
+--------------------+-------------+
| 0.01572292071074506| Pennsylvania|
|0.015606461167247017|  Mississippi|
|0.015044595741158455|      Alabama|
|0.014890612444262373|      Arizona|
|0.014729239552703312|       Nevada|
| 0.01471106889038076|      Georgia|
|0.014620126624458513|     Michigan|
|0.014515960564513415|   New Jersey|
|0.014486229819563153|   New Mexico|
|0.014153086108092123|         Ohio|
|0.014084709388278386|     Missouri|
|0.014008626216106716|  Connecticut|
|0.013778398587225054|      Indiana|
| 0.01373330920820223|    Louisiana|
|0.013704374797243301|West Virginia|
|0.013621137218224505|     Arkansas|
|0.013566368223629567|      Florida|
|0.013175195613352944|Massachusetts|
|0.013146163213431723|     Maryland|
|0.013120000121502577|     New York|
+--------------------+-------------+
only showing top 20 rows



In [27]:
# Write code to something else interesting with this data – your choice
print("Utah Case count")
spark.sql(
    """
      select state, sum(cases) as cases
      from covid
      where state = 'Utah'
      group by state,date
      order by date desc
      limit 1
    """
).show()

Utah Case count
+-----+------+
|state| cases|
+-----+------+
| Utah|939092|
+-----+------+

