In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/My Drive/

/content/drive/My Drive


In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=9289e92881cc1aa7a988491e59fd60ba36335771635b163a84a1aabe7d003bac
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
#Import libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import window, sum, isnan, when, count, col
from pyspark.sql. functions import *
from pyspark.sql.types import *
import time


spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#Read the spark dataframe
df = spark.read.csv("/content/drive/My Drive/us-counties.csv", header=True, inferSchema=True)

In [None]:
df.show()

+----------+-----------+----------+-----+-----+------+
|      date|     county|     state| fips|cases|deaths|
+----------+-----------+----------+-----+-----+------+
|2020-01-21|  Snohomish|Washington|53061|    1|     0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|
|2020-01-25|     Orange|California| 6059|    1|     0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|
|2020-01-25|  Snohomish|Washington|53061|    1|     0|
|2020-01-26|   Maricopa|   Arizona| 4013|    1|     0|
|2020-01-26|Los Angeles|California| 6037|    1|     0|
|2020-01-26|     Orange|California| 6059|    1|     0|
|2020-01-26|       Cook|  Illinois|17031|    1|     0|
|2020-01-26|  Snohomish|Washington|53061|    1|     0|
|2020-01-27|   Maricopa|   Arizona| 4013|    1|     0|
|2020-01-27|Los Angeles|California| 6037|    1|     0|
|2020-01-2

In [None]:
#Checking the schema and the data
df.printSchema()
df.describe().show()

root
 |-- date: date (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: integer (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)

+-------+---------+-------+------------------+------------------+------------------+
|summary|   county|  state|              fips|             cases|            deaths|
+-------+---------+-------+------------------+------------------+------------------+
|  count|  2502832|2502832|           2479154|           2502832|           2445227|
|   mean|     NULL|   NULL| 31399.58357286397|10033.804996899513|161.61002270954802|
| stddev|     NULL|   NULL|16342.509037015281| 47525.21722359842| 820.3334694664095|
|    min|Abbeville|Alabama|              1001|                 0|                 0|
|    max|  Ziebach|Wyoming|             78030|           2908425|             40267|
+-------+---------+-------+------------------+------------------+------------------+



We see that some of the rows in the deaths columns are missing. Upon analysing the data, majority of the counties data is missing for the state of Puerto Rico. Hence, we drop those rows.

In [None]:
df_filtered = df.filter(df['state'] != 'Puerto Rico')

df_filtered.describe().show()

+-------+---------+-------+------------------+------------------+------------------+
|summary|   county|  state|              fips|             cases|            deaths|
+-------+---------+-------+------------------+------------------+------------------+
|  count|  2444435|2444435|           2421549|           2444435|           2444435|
|   mean|     NULL|   NULL| 30431.93603309287|10215.558783931665|161.00553093046042|
| stddev|     NULL|   NULL|15268.708208949047| 48069.46311060629| 819.4017861645385|
|    min|Abbeville|Alabama|              1001|                 0|                 0|
|    max|  Ziebach|Wyoming|             78030|           2908425|             40267|
+-------+---------+-------+------------------+------------------+------------------+



In [None]:
#Converting the date column type to timestamp
df = df_filtered.withColumn("date", col("date").cast("timestamp"))

In [None]:
#Spark Dataframe after timestamp conversion
df.show()

+-------------------+-----------+----------+-----+-----+------+
|               date|     county|     state| fips|cases|deaths|
+-------------------+-----------+----------+-----+-----+------+
|2020-01-21 00:00:00|  Snohomish|Washington|53061|    1|     0|
|2020-01-22 00:00:00|  Snohomish|Washington|53061|    1|     0|
|2020-01-23 00:00:00|  Snohomish|Washington|53061|    1|     0|
|2020-01-24 00:00:00|       Cook|  Illinois|17031|    1|     0|
|2020-01-24 00:00:00|  Snohomish|Washington|53061|    1|     0|
|2020-01-25 00:00:00|     Orange|California| 6059|    1|     0|
|2020-01-25 00:00:00|       Cook|  Illinois|17031|    1|     0|
|2020-01-25 00:00:00|  Snohomish|Washington|53061|    1|     0|
|2020-01-26 00:00:00|   Maricopa|   Arizona| 4013|    1|     0|
|2020-01-26 00:00:00|Los Angeles|California| 6037|    1|     0|
|2020-01-26 00:00:00|     Orange|California| 6059|    1|     0|
|2020-01-26 00:00:00|       Cook|  Illinois|17031|    1|     0|
|2020-01-26 00:00:00|  Snohomish|Washing

In [None]:
#Getting cases and deaths per day using Spark Transformation
df_main = df.groupBy("date","state").agg(F.sum("cases").alias("Cases Per Day"), F.sum("deaths").alias("Deaths per Day")).sort(F.asc("state"))

df_main.show()

+-------------------+-------+-------------+--------------+
|               date|  state|Cases Per Day|Deaths per Day|
+-------------------+-------+-------------+--------------+
|2021-02-01 00:00:00|Alabama|       460860|          7688|
|2021-03-23 00:00:00|Alabama|       511789|         10450|
|2021-04-17 00:00:00|Alabama|       522131|         10790|
|2020-05-03 00:00:00|Alabama|         7888|           290|
|2020-07-29 00:00:00|Alabama|        83782|          1538|
|2020-11-08 00:00:00|Alabama|       203687|          3084|
|2021-03-15 00:00:00|Alabama|       508229|         10329|
|2021-01-24 00:00:00|Alabama|       441170|          6660|
|2020-08-19 00:00:00|Alabama|       111478|          1944|
|2020-05-23 00:00:00|Alabama|        14149|           549|
|2020-10-02 00:00:00|Alabama|       156698|          2550|
|2021-05-07 00:00:00|Alabama|       530325|         10966|
|2020-11-09 00:00:00|Alabama|       204857|          3084|
|2020-05-14 00:00:00|Alabama|        11101|           47

Since we have a single csv file that has the information of the USA covid numbers. To showcase real time streaming analysis, we will partition the csv file into multiple csv files that contains each date's covid cases and deaths for each state and its respective counties. This will serve as our input stream source.

In [None]:
dates = df.select("date").distinct().collect()

#Note: coalesce(1) will keep the subsequent data for that particular date in one csv file
for d in dates:
  df_file = df.where(df['date'] == d[0].strftime('%Y-%m-%d'))
  df_file.coalesce(1).write.mode("append").option("header", "true").csv("/content/drive/My Drive/Final_USA_Files")

Checking one csv file from the input source directory

In [None]:
spark.read.csv("/content/drive/My Drive/part-00000-e3b8fd0f-ab88-407b-94b8-9771e55febc0-c000.csv",
                       header=True,
                       inferSchema=True).show()

+-------------------+---------+-------+----+-----+------+
|               date|   county|  state|fips|cases|deaths|
+-------------------+---------+-------+----+-----+------+
|2022-04-26 00:00:00|  Autauga|Alabama|1001|15818|   215|
|2022-04-26 00:00:00|  Baldwin|Alabama|1003|55617|   680|
|2022-04-26 00:00:00|  Barbour|Alabama|1005| 5664|    98|
|2022-04-26 00:00:00|     Bibb|Alabama|1007| 6437|   104|
|2022-04-26 00:00:00|   Blount|Alabama|1009|14968|   243|
|2022-04-26 00:00:00|  Bullock|Alabama|1011| 2318|    54|
|2022-04-26 00:00:00|   Butler|Alabama|1013| 5064|   129|
|2022-04-26 00:00:00|  Calhoun|Alabama|1015|32388|   626|
|2022-04-26 00:00:00| Chambers|Alabama|1017| 8483|   162|
|2022-04-26 00:00:00| Cherokee|Alabama|1019| 5125|    86|
|2022-04-26 00:00:00|  Chilton|Alabama|1021|11113|   207|
|2022-04-26 00:00:00|  Choctaw|Alabama|1023| 2049|    36|
|2022-04-26 00:00:00|   Clarke|Alabama|1025| 7142|   100|
|2022-04-26 00:00:00|     Clay|Alabama|1027| 4085|    82|
|2022-04-26 00

In [None]:
#Creating schema
schema= StructType([StructField('date', TimestampType(), True),
                        StructField('county', StringType(), True),
                        StructField('state', StringType(), True),
                        StructField('fips', IntegerType(), True),
                        StructField('cases', IntegerType(), True),
                        StructField('deaths', IntegerType(), True)])

### Spark Streaming

In [None]:
#Setting a input source by creating readStream
stream = spark.readStream.schema(schema).option("maxFilesPerTrigger", 10)\
          .csv("/content/drive/My Drive/Final_USA_Files") #the directory that has all the files


In [None]:
#Creating the Transformation/Aggregration for the stream
total = stream.groupBy("state").agg(F.sum("cases").alias("Total Cases"), F.sum("deaths").alias("Total Deaths")).sort(F.asc("state"))



In [None]:
#Setting the output sink
output_sink = total.writeStream.queryName("total")\
        .format("memory")\
        .outputMode("complete")\
        .start()

In [None]:
# Checking the in-memory table getting updated with the incoming micro-bacthes with Spark SQL interactions.
for x in range(100):
  sql = spark.sql("SELECT * FROM total")

  if sql.count() > 0:
    print(f"Batch: {x+1}")
    sql.show()
  else:
    print(f"Batch: {x+1}")
    print("No data passed")

  time.sleep(0.3)

output_sink.stop() #Shutting the stream down

Batch: 1
No data passed
Batch: 2
No data passed
Batch: 3
No data passed
Batch: 4
No data passed
Batch: 5
No data passed
Batch: 6
No data passed
Batch: 7
No data passed
Batch: 8
No data passed
Batch: 9
No data passed
Batch: 10
No data passed
Batch: 11
No data passed
Batch: 12
No data passed
Batch: 13
No data passed
Batch: 14
+--------------------+-----------+------------+
|               state|Total Cases|Total Deaths|
+--------------------+-----------+------------+
|             Alabama|     904774|       18616|
|              Alaska|      84835|         457|
|             Arizona|    1494144|       32922|
|            Arkansas|     520984|        8202|
|          California|    5947563|      110127|
|            Colorado|     772555|       17042|
|         Connecticut|     647150|       34062|
|            Delaware|     184580|        4530|
|District of Columbia|     117192|        4381|
|             Florida|    4038467|       70882|
|             Georgia|    1874387|       40983|
| 

## Window Operation (Tumbling Window)

In [None]:
stream = spark.readStream.schema(schema).option("maxFilesPerTrigger", 10)\
          .csv("/content/drive/My Drive/Final_USA_Files") #the directory that has all the files


Setting a tumbling window of window_duration 14 days

In [None]:
window_duration = "14 days"

tumbling_df = stream.groupBy(window("date", window_duration), "state") \
    .agg(sum("cases").alias("total_cases"), sum("deaths").alias("total_deaths"))

In [None]:
output_tumble = tumbling_df.writeStream.queryName("tumbling")\
        .format("memory")\
        .outputMode("complete")\
        .start()


In [None]:
#Getting the number of cases and deaths for Ohio state for each 14 day intervals
for x in range(100):
  sql = spark.sql("SELECT window.start, window.end,state, total_cases, total_deaths FROM tumbling where state = 'Ohio' order by window")

  if sql.count() > 0:
    print(f"Batch: {x+1}")
    sql.show()
  else:
    print(f"Batch: {x+1}")
    print("No data passed")

  time.sleep(0.3)

output_tumble.stop()

Batch: 1
No data passed
Batch: 2
No data passed
Batch: 3
No data passed
Batch: 4
No data passed
Batch: 5
No data passed
Batch: 6
No data passed
Batch: 7
No data passed
Batch: 8
No data passed
Batch: 9
+-------------------+-------------------+-----+-----------+------------+
|              start|                end|state|total_cases|total_deaths|
+-------------------+-------------------+-----+-----------+------------+
|2020-03-19 00:00:00|2020-04-02 00:00:00| Ohio|       2199|          55|
|2020-04-02 00:00:00|2020-04-16 00:00:00| Ohio|      12856|         500|
|2020-04-30 00:00:00|2020-05-14 00:00:00| Ohio|      25727|        1483|
|2020-06-11 00:00:00|2020-06-25 00:00:00| Ohio|      83735|        5157|
|2020-07-09 00:00:00|2020-07-23 00:00:00| Ohio|      66853|        3064|
|2020-08-20 00:00:00|2020-09-03 00:00:00| Ohio|     115651|        3986|
|2020-09-03 00:00:00|2020-09-17 00:00:00| Ohio|     129785|        4256|
|2021-04-29 00:00:00|2021-05-13 00:00:00| Ohio|    1081518|       194

## Window Operation (Sliding Window)

In [None]:
#Setting a window duration of 14 days with sliding interval of 7 days
indow_duration = "14 days"
slide_interval = "7 days"


slide_df = stream.groupBy(window("date", window_duration, slide_interval), "state").agg(sum("cases").alias("total_cases"), sum("deaths").alias("total_deaths"))

In [None]:
output_slide = slide_df.writeStream.queryName("sliding")\
        .format("memory")\
        .outputMode("complete")\
        .start()


In [None]:
#Getting the number of cases and deaths for Ohio state for each 14 day intervals with overlapping windows and 7 day offset
for x in range(100):
  sql = spark.sql("SELECT window.start, window.end,state, total_cases, total_deaths FROM sliding where state = 'Ohio' order by window")

  if sql.count() > 0:
    print(f"Batch: {x+1}")
    sql.show()
  else:
    print(f"Batch: {x+1}")
    print("No data passed")

  time.sleep(0.3)

output_slide.stop()

Batch: 1
No data passed
Batch: 2
No data passed
Batch: 3
No data passed
Batch: 4
No data passed
Batch: 5
No data passed
Batch: 6
No data passed
Batch: 7
No data passed
Batch: 8
No data passed
Batch: 9
No data passed
Batch: 10
+-------------------+-------------------+-----+-----------+------------+
|              start|                end|state|total_cases|total_deaths|
+-------------------+-------------------+-----+-----------+------------+
|2020-03-19 00:00:00|2020-04-02 00:00:00| Ohio|       2199|          55|
|2020-03-26 00:00:00|2020-04-09 00:00:00| Ohio|       2199|          55|
|2020-04-02 00:00:00|2020-04-16 00:00:00| Ohio|      12856|         500|
|2020-04-09 00:00:00|2020-04-23 00:00:00| Ohio|      12856|         500|
|2020-04-30 00:00:00|2020-05-14 00:00:00| Ohio|      25727|        1483|
|2020-05-07 00:00:00|2020-05-21 00:00:00| Ohio|      25727|        1483|
|2020-06-04 00:00:00|2020-06-18 00:00:00| Ohio|      40004|        2490|
|2020-06-11 00:00:00|2020-06-25 00:00:00| Oh

Sliding window operations with 14 days window duration and 14 days slide interval

In [None]:
window_duration = "14 days"
slide_interval = "14 days"


windowed_df = stream.groupBy(window("date", window_duration, slide_interval), "state") \
    .agg(sum("cases").alias("total_cases"), sum("deaths").alias("total_deaths"))

In [None]:
output_slide = windowed_df.writeStream.queryName("window")\
        .format("memory")\
        .outputMode("complete")\
        .start()

In [None]:
#This will give the same output as the tumbling window operation
for x in range(100):
  sql = spark.sql("SELECT window.start, window.end,state, total_cases, total_deaths  FROM window where state = 'Ohio' order by window")

  if sql.count() > 0:
    print(f"Batch: {x+1}")
    sql.show()
  else:
    print(f"Batch: {x+1}")
    print("No data passed")

  time.sleep(0.3)

output_slide.stop()

Batch: 1
No data passed
Batch: 2
No data passed
Batch: 3
No data passed
Batch: 4
No data passed
Batch: 5
No data passed
Batch: 6
No data passed
Batch: 7
No data passed
Batch: 8
No data passed
Batch: 9
No data passed
Batch: 10
+-------------------+-------------------+-----+-----------+------------+
|              start|                end|state|total_cases|total_deaths|
+-------------------+-------------------+-----+-----------+------------+
|2020-03-19 00:00:00|2020-04-02 00:00:00| Ohio|       2199|          55|
|2020-04-02 00:00:00|2020-04-16 00:00:00| Ohio|      12856|         500|
|2020-04-30 00:00:00|2020-05-14 00:00:00| Ohio|      25727|        1483|
|2020-06-11 00:00:00|2020-06-25 00:00:00| Ohio|      83735|        5157|
|2020-07-09 00:00:00|2020-07-23 00:00:00| Ohio|      66853|        3064|
|2020-08-20 00:00:00|2020-09-03 00:00:00| Ohio|     115651|        3986|
|2020-09-03 00:00:00|2020-09-17 00:00:00| Ohio|     129785|        4256|
|2021-04-29 00:00:00|2021-05-13 00:00:00| Oh