In [58]:
# installing the required libraries
!pip install pyspark py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
# importing the libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
spark = SparkSession.builder.appName("covid19analysis").getOrCreate()
spark

In [4]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(spark)
sqlContext



<pyspark.sql.context.SQLContext at 0x7fe9b0bf6e50>

In [5]:
# df = spark.read.csv('us-counties.csv', header=True)
# df.show(5)

+----------+---------+----------+-----+-----+------+
|      date|   county|     state| fips|cases|deaths|
+----------+---------+----------+-----+-----+------+
|2020-01-21|Snohomish|Washington|53061|    1|     0|
|2020-01-22|Snohomish|Washington|53061|    1|     0|
|2020-01-23|Snohomish|Washington|53061|    1|     0|
|2020-01-24|     Cook|  Illinois|17031|    1|     0|
|2020-01-24|Snohomish|Washington|53061|    1|     0|
+----------+---------+----------+-----+-----+------+
only showing top 5 rows



In [6]:
# filtering out the latest date in the table
latest_date = df.agg(F.max('date').alias('max_date'))
latest_date.show()

+----------+
|  max_date|
+----------+
|2020-06-23|
+----------+



In [7]:
# saving it as a temporary table
df.registerTempTable("covid_data")



In [8]:
# checking how many records the df has
df.count()

265259

In [9]:
# querying using sql 
latest_date_sql = sqlContext.sql(
    """ SELECT max(date) as max_date FROM covid_data """
)
latest_date_sql.show()

+----------+
|  max_date|
+----------+
|2020-06-23|
+----------+



In [10]:
type(latest_date)

pyspark.sql.dataframe.DataFrame

In [11]:
type(latest_date_sql)

pyspark.sql.dataframe.DataFrame

In [12]:
# what collect does is it converts the data into a python list. Be aware that collecting large data might crash the api 
latest_date = latest_date.collect()
latest_date

[Row(max_date='2020-06-23')]

In [13]:
latest_date = latest_date[0]['max_date'] 
latest_date

'2020-06-23'

In [14]:
# filtering out the top 2 latest dates
df_filtered = df.where(
    "date = '{}'".format(latest_date)
)

df_filtered.show(2)

+----------+-------+-------+-----+-----+------+
|      date| county|  state| fips|cases|deaths|
+----------+-------+-------+-----+-----+------+
|2020-06-23|Autauga|Alabama|01001|  453|     9|
|2020-06-23|Baldwin|Alabama|01003|  450|     9|
+----------+-------+-------+-----+-----+------+
only showing top 2 rows



In [15]:
df_filtered.count()

171

In [16]:
latest_date_sql = sqlContext.sql(
    """ SELECT * FROM covid_data where date = '{}'""".format(latest_date)
)

latest_date_sql.show(10)

+----------+--------+-------+-----+-----+------+
|      date|  county|  state| fips|cases|deaths|
+----------+--------+-------+-----+-----+------+
|2020-06-23| Autauga|Alabama|01001|  453|     9|
|2020-06-23| Baldwin|Alabama|01003|  450|     9|
|2020-06-23| Barbour|Alabama|01005|  280|     1|
|2020-06-23|    Bibb|Alabama|01007|  135|     1|
|2020-06-23|  Blount|Alabama|01009|  159|     1|
|2020-06-23| Bullock|Alabama|01011|  329|    10|
|2020-06-23|  Butler|Alabama|01013|  581|    27|
|2020-06-23| Calhoun|Alabama|01015|  216|     5|
|2020-06-23|Chambers|Alabama|01017|  534|    27|
|2020-06-23|Cherokee|Alabama|01019|   58|     7|
+----------+--------+-------+-----+-----+------+
only showing top 10 rows



In [17]:
# using group and aggregate to get the overall statistics
overall_stats = df_filtered.agg(
    F.sum('cases').alias('total_cases'),
    F.sum('deaths').alias('total_deaths'),
    F.countDistinct("county").alias("number_of_counties"),
    F.countDistinct("state").alias("number_of_states"),
)

overall_stats.show(1, False)

+-----------+------------+------------------+----------------+
|total_cases|total_deaths|number_of_counties|number_of_states|
+-----------+------------+------------------+----------------+
|102116.0   |2466.0      |155               |4               |
+-----------+------------+------------------+----------------+



In [18]:
# ordering the county names in ascending order
df_filtered.orderBy("county").show(10, False)

+----------+--------------------------+--------+-----+-----+------+
|date      |county                    |state   |fips |cases|deaths|
+----------+--------------------------+--------+-----+-----+------+
|2020-06-23|Aleutians East Borough    |Alaska  |02013|2    |0     |
|2020-06-23|Aleutians West Census Area|Alaska  |02016|4    |0     |
|2020-06-23|Anchorage                 |Alaska  |02020|401  |6     |
|2020-06-23|Apache                    |Arizona |04001|2165 |67    |
|2020-06-23|Arkansas                  |Arkansas|05001|21   |0     |
|2020-06-23|Ashley                    |Arkansas|05003|57   |1     |
|2020-06-23|Autauga                   |Alabama |01001|453  |9     |
|2020-06-23|Baldwin                   |Alabama |01003|450  |9     |
|2020-06-23|Barbour                   |Alabama |01005|280  |1     |
|2020-06-23|Baxter                    |Arkansas|05005|15   |0     |
+----------+--------------------------+--------+-----+-----+------+
only showing top 10 rows



In [19]:
# registering the dataframe as a table
df_filtered.registerTempTable("covid19_20210501")



In [20]:
sqlContext.sql(
    """

        SELECT
            county,
            SUM(cases) as number_of_cases,
            SUM(deaths) as number_of_deaths,
            COUNT(*) as number_of_records,
            COUNT(DISTINCT county) as number_of_counties,
            COUNT(DISTINCT state) as number_of_states
        FROM
            covid19_20210501
        GROUP BY
            county
        ORDER BY
            county
    """ 
).show(10, False)

+--------------------------+---------------+----------------+-----------------+------------------+----------------+
|county                    |number_of_cases|number_of_deaths|number_of_records|number_of_counties|number_of_states|
+--------------------------+---------------+----------------+-----------------+------------------+----------------+
|Aleutians East Borough    |2.0            |0.0             |1                |1                 |1               |
|Aleutians West Census Area|4.0            |0.0             |1                |1                 |1               |
|Anchorage                 |401.0          |6.0             |1                |1                 |1               |
|Apache                    |2165.0         |67.0            |1                |1                 |1               |
|Arkansas                  |21.0           |0.0             |1                |1                 |1               |
|Ashley                    |57.0           |1.0             |1          

In [21]:
county_summary = df_filtered.groupBy(
    "county"
).agg(
    F.sum("cases").alias("total_cases"),
    F.sum("deaths").alias("total_deaths"),
    F.count("*").alias("number_of_records"),
    F.countDistinct("state").alias("number_of_states")
)

county_summary.orderBy("county").show(20, False)

+-----------------------------------+-----------+------------+-----------------+----------------+
|county                             |total_cases|total_deaths|number_of_records|number_of_states|
+-----------------------------------+-----------+------------+-----------------+----------------+
|Aleutians East Borough             |2.0        |0.0         |1                |1               |
|Aleutians West Census Area         |4.0        |0.0         |1                |1               |
|Anchorage                          |401.0      |6.0         |1                |1               |
|Apache                             |2165.0     |67.0        |1                |1               |
|Arkansas                           |21.0       |0.0         |1                |1               |
|Ashley                             |57.0       |1.0         |1                |1               |
|Autauga                            |453.0      |9.0         |1                |1               |
|Baldwin            

In [22]:
filtered_county_rec = county_summary.where(
    F.col("number_of_records") != F.col("number_of_states")
)

filtered_county_rec.count()

0

In [23]:
# filtering data for county - Adair and Addison
left_table = county_summary.where(
    "county in ('Adair', 'Addison')"
)

left_table.show(3, False)

+------+-----------+------------+-----------------+----------------+
|county|total_cases|total_deaths|number_of_records|number_of_states|
+------+-----------+------------+-----------------+----------------+
+------+-----------+------------+-----------------+----------------+



In [24]:
# filtering data for county - Ada and Accomack
right_table = county_summary.where(
    "county in ('Ada', 'Accomack')"
)

right_table.show(3, False)

+------+-----------+------------+-----------------+----------------+
|county|total_cases|total_deaths|number_of_records|number_of_states|
+------+-----------+------------+-----------------+----------------+
+------+-----------+------------+-----------------+----------------+



In [25]:
# inner join
inner_table = county_summary.join(
    left_table,
    on=["county"],
    how="inner"
)

inner_table.show(100, False)

+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+
|county|total_cases|total_deaths|number_of_records|number_of_states|total_cases|total_deaths|number_of_records|number_of_states|
+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+
+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+



In [26]:
# left join
left_table_joined = left_table.join(
    right_table,
    on=["county"],
    how="left"
)

left_table_joined.show(100, False)

+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+
|county|total_cases|total_deaths|number_of_records|number_of_states|total_cases|total_deaths|number_of_records|number_of_states|
+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+
+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+



In [27]:
# right join
right_table_joined = left_table.join(
    right_table,
    on=["county"],
    how="right"
)

right_table_joined.show(100, False)

+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+
|county|total_cases|total_deaths|number_of_records|number_of_states|total_cases|total_deaths|number_of_records|number_of_states|
+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+
+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+



In [28]:
# outer join
outer_table_joined = left_table.join(
    right_table,
    on=["county"],
    how="outer"
)

outer_table_joined.show(100, False)

+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+
|county|total_cases|total_deaths|number_of_records|number_of_states|total_cases|total_deaths|number_of_records|number_of_states|
+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+
+------+-----------+------------+-----------------+----------------+-----------+------------+-----------------+----------------+



In [29]:
# get percentage of cases for each state in Adair
# filter data for county - Adair
adair_overall = county_summary.where(
    "county in ('Adair')"
)

adair_overall.show(5, False)

+------+-----------+------------+-----------------+----------------+
|county|total_cases|total_deaths|number_of_records|number_of_states|
+------+-----------+------------+-----------------+----------------+
+------+-----------+------------+-----------------+----------------+



In [30]:
perc_cases_statewise = df_filtered.join(
    adair_overall,
    on="county",
    how="inner"
)

perc_cases_statewise.show(10, False)

+------+----+-----+----+-----+------+-----------+------------+-----------------+----------------+
|county|date|state|fips|cases|deaths|total_cases|total_deaths|number_of_records|number_of_states|
+------+----+-----+----+-----+------+-----------+------------+-----------------+----------------+
+------+----+-----+----+-----+------+-----------+------------+-----------------+----------------+



In [31]:
# calculate percentage
# withColum is used to create new column 
perc_cases_statewise = perc_cases_statewise.withColumn(
    "perc_cases",
    F.col("cases")/F.col("total_cases")
).withColumn(
    "perc_deaths",
    F.col("deaths")/F.col("total_deaths")
)

perc_cases_statewise.show(10, False)

+------+----+-----+----+-----+------+-----------+------------+-----------------+----------------+----------+-----------+
|county|date|state|fips|cases|deaths|total_cases|total_deaths|number_of_records|number_of_states|perc_cases|perc_deaths|
+------+----+-----+----+-----+------+-----------+------------+-----------------+----------------+----------+-----------+
+------+----+-----+----+-----+------+-----------+------------+-----------------+----------------+----------+-----------+



In [32]:
# using window functions
from pyspark.sql.window import Window

In [33]:
county_summary.show(5, False)

+-------+-----------+------------+-----------------+----------------+
|county |total_cases|total_deaths|number_of_records|number_of_states|
+-------+-----------+------------+-----------------+----------------+
|Izard  |20.0       |1.0         |1                |1               |
|Scott  |8.0        |0.0         |1                |1               |
|Baldwin|450.0      |9.0         |1                |1               |
|Monroe |177.0      |2.0         |2                |2               |
|Graham |66.0       |2.0         |1                |1               |
+-------+-----------+------------+-----------------+----------------+
only showing top 5 rows



In [34]:
county_summary_ranked = county_summary.withColumn(
    "rank",
    F.rank().over(Window.orderBy("total_cases"))
)

county_summary_ranked.show(5, False)

+---------------------------------+-----------+------------+-----------------+----------------+----+
|county                           |total_cases|total_deaths|number_of_records|number_of_states|rank|
+---------------------------------+-----------+------------+-----------------+----------------+----+
|Aleutians East Borough           |2.0        |0.0         |1                |1               |1   |
|Yukon-Koyukuk Census Area        |2.0        |0.0         |1                |1               |1   |
|Prince of Wales-Hyder Census Area|3.0        |0.0         |1                |1               |3   |
|Northwest Arctic Borough         |4.0        |0.0         |1                |1               |4   |
|Newton                           |4.0        |0.0         |1                |1               |4   |
+---------------------------------+-----------+------------+-----------------+----------------+----+
only showing top 5 rows



In [35]:
county_summary_ranked = county_summary_ranked.withColumn(
    "rank_desc",
    F.rank().over(Window.orderBy(F.desc("total_cases")))
)

county_summary_ranked.orderBy("total_cases", ascending=False).show(5, False)

+----------+-----------+------------+-----------------+----------------+----+---------+
|county    |total_cases|total_deaths|number_of_records|number_of_states|rank|rank_desc|
+----------+-----------+------------+-----------------+----------------+----+---------+
|Maricopa  |33883.0    |663.0       |1                |1               |155 |1        |
|Pima      |6089.0     |247.0       |1                |1               |154 |2        |
|Yuma      |4940.0     |69.0        |1                |1               |153 |3        |
|Jefferson |3801.0     |155.0       |2                |2               |152 |4        |
|Montgomery|3353.0     |87.0        |2                |2               |151 |5        |
+----------+-----------+------------+-----------------+----------------+----+---------+
only showing top 5 rows



In [36]:
df_filtered.show(5, False)

+----------+-------+-------+-----+-----+------+
|date      |county |state  |fips |cases|deaths|
+----------+-------+-------+-----+-----+------+
|2020-06-23|Autauga|Alabama|01001|453  |9     |
|2020-06-23|Baldwin|Alabama|01003|450  |9     |
|2020-06-23|Barbour|Alabama|01005|280  |1     |
|2020-06-23|Bibb   |Alabama|01007|135  |1     |
|2020-06-23|Blount |Alabama|01009|159  |1     |
+----------+-------+-------+-----+-----+------+
only showing top 5 rows



In [37]:
df_filtered.count()

171

In [38]:
ranked_states = df_filtered.withColumn(
    "state_rank",
    F.rank().over(Window.partitionBy("county").orderBy(F.desc("cases")))
)

ranked_states.orderBy("county", "state_rank").show(30, False)

+----------+-----------------------------------+--------+-----+-----+------+----------+
|date      |county                             |state   |fips |cases|deaths|state_rank|
+----------+-----------------------------------+--------+-----+-----+------+----------+
|2020-06-23|Aleutians East Borough             |Alaska  |02013|2    |0     |1         |
|2020-06-23|Aleutians West Census Area         |Alaska  |02016|4    |0     |1         |
|2020-06-23|Anchorage                          |Alaska  |02020|401  |6     |1         |
|2020-06-23|Apache                             |Arizona |04001|2165 |67    |1         |
|2020-06-23|Arkansas                           |Arkansas|05001|21   |0     |1         |
|2020-06-23|Ashley                             |Arkansas|05003|57   |1     |1         |
|2020-06-23|Autauga                            |Alabama |01001|453  |9     |1         |
|2020-06-23|Baldwin                            |Alabama |01003|450  |9     |1         |
|2020-06-23|Barbour             

In [39]:
ranked_states.count()

171

In [40]:
ranked_states_filtered = ranked_states.filter(
    "state_rank = 1"
)

ranked_states_filtered.count()

155

In [41]:
ranked_states_filtered.orderBy("county", "state_rank").show(50, False)

+----------+-----------------------------------+--------+-----+-----+------+----------+
|date      |county                             |state   |fips |cases|deaths|state_rank|
+----------+-----------------------------------+--------+-----+-----+------+----------+
|2020-06-23|Aleutians East Borough             |Alaska  |02013|2    |0     |1         |
|2020-06-23|Aleutians West Census Area         |Alaska  |02016|4    |0     |1         |
|2020-06-23|Anchorage                          |Alaska  |02020|401  |6     |1         |
|2020-06-23|Apache                             |Arizona |04001|2165 |67    |1         |
|2020-06-23|Arkansas                           |Arkansas|05001|21   |0     |1         |
|2020-06-23|Ashley                             |Arkansas|05003|57   |1     |1         |
|2020-06-23|Autauga                            |Alabama |01001|453  |9     |1         |
|2020-06-23|Baldwin                            |Alabama |01003|450  |9     |1         |
|2020-06-23|Barbour             

In [46]:
ranked_states = ranked_states.withColumn(
    "country_total_cases",
    F.sum("cases").over(Window.partitionBy("county"))
).withColumn(
    "perc_total_cases",
    F.col("cases")/F.col("country_total_cases")
).withColumn(
    "country_avg_cases",
    F.avg("cases").over(Window.partitionBy("county"))
).withColumn(
    "country_max_cases",
    F.max("cases").over(Window.partitionBy("county"))
)

ranked_states.orderBy("county", "perc_total_cases").show(50, False)

+----------+-----------------------------------+--------+-----+-----+------+----------+-------------------+--------------------+-----------------+-----------------+
|date      |county                             |state   |fips |cases|deaths|state_rank|country_total_cases|perc_total_cases    |country_avg_cases|country_max_cases|
+----------+-----------------------------------+--------+-----+-----+------+----------+-------------------+--------------------+-----------------+-----------------+
|2020-06-23|Aleutians East Borough             |Alaska  |02013|2    |0     |1         |2.0                |1.0                 |2.0              |2                |
|2020-06-23|Aleutians West Census Area         |Alaska  |02016|4    |0     |1         |4.0                |1.0                 |4.0              |4                |
|2020-06-23|Anchorage                          |Alaska  |02020|401  |6     |1         |401.0              |1.0                 |401.0            |401              |
|2020-06-2

In [47]:
ranked_states.registerTempTable("ranked_states")



In [50]:
ranked_states_sql = sqlContext.sql("""
    SELECT
        *,
        MIN(cases) OVER (PARTITION BY county) as country_min_cases
    FROM
        ranked_states
""")

ranked_states_sql = ranked_states_sql.drop('country_total_cases')

ranked_states_sql.orderBy("county", "perc_total_cases").show(50, False)

+----------+-----------------------------------+--------+-----+-----+------+----------+--------------------+-----------------+-----------------+-----------------+
|date      |county                             |state   |fips |cases|deaths|state_rank|perc_total_cases    |country_avg_cases|country_max_cases|country_min_cases|
+----------+-----------------------------------+--------+-----+-----+------+----------+--------------------+-----------------+-----------------+-----------------+
|2020-06-23|Aleutians East Borough             |Alaska  |02013|2    |0     |1         |1.0                 |2.0              |2                |2                |
|2020-06-23|Aleutians West Census Area         |Alaska  |02016|4    |0     |1         |1.0                 |4.0              |4                |4                |
|2020-06-23|Anchorage                          |Alaska  |02020|401  |6     |1         |1.0                 |401.0            |401              |401              |
|2020-06-23|Apache    