In [None]:
!pip install pyspark kaggle
!kaggle datasets download -d sudalairajkumar/novel-corona-virus-2019-dataset
!unzip novel-corona-virus-2019-dataset.zip

Dataset URL: https://www.kaggle.com/datasets/sudalairajkumar/novel-corona-virus-2019-dataset
License(s): copyright-authors
Downloading novel-corona-virus-2019-dataset.zip to /content
  0% 0.00/8.52M [00:00<?, ?B/s]
100% 8.52M/8.52M [00:00<00:00, 106MB/s]
Archive:  novel-corona-virus-2019-dataset.zip
  inflating: covid_19_data.csv       
  inflating: time_series_covid_19_confirmed.csv  
  inflating: time_series_covid_19_confirmed_US.csv  
  inflating: time_series_covid_19_deaths.csv  
  inflating: time_series_covid_19_deaths_US.csv  
  inflating: time_series_covid_19_recovered.csv  


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


In [None]:
spark = SparkSession.builder.appName("CovidDataAnalysis").getOrCreate()

In [None]:
try:
  df = spark.read.csv("covid_19_data.csv", header=True, inferSchema=True)
except Exception as e:
  print(f"Error loading dataset: {e}")
  exit()

In [None]:
df.printSchema()
df.show(5)

root
 |-- SNo: integer (nullable = true)
 |-- ObservationDate: string (nullable = true)
 |-- Province/State: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Last Update: string (nullable = true)
 |-- Confirmed: double (nullable = true)
 |-- Deaths: double (nullable = true)
 |-- Recovered: double (nullable = true)

+---+---------------+--------------+--------------+---------------+---------+------+---------+
|SNo|ObservationDate|Province/State|Country/Region|    Last Update|Confirmed|Deaths|Recovered|
+---+---------------+--------------+--------------+---------------+---------+------+---------+
|  1|     01/22/2020|         Anhui|Mainland China|1/22/2020 17:00|      1.0|   0.0|      0.0|
|  2|     01/22/2020|       Beijing|Mainland China|1/22/2020 17:00|     14.0|   0.0|      0.0|
|  3|     01/22/2020|     Chongqing|Mainland China|1/22/2020 17:00|      6.0|   0.0|      0.0|
|  4|     01/22/2020|        Fujian|Mainland China|1/22/2020 17:00|      1.0|   0.0|  

In [None]:
df = df.na.drop()

In [None]:
total_cases_by_country = df.groupBy("Country/Region").agg(sum("Confirmed").alias("TotalConfirmed"))
total_cases_by_country.show()

+--------------+--------------+
|Country/Region|TotalConfirmed|
+--------------+--------------+
|        Russia|  9.21150045E8|
|        Sweden|  1.40771789E8|
|          Iraq|           0.0|
|       Germany|  5.17436073E8|
|        France|   1.9184641E7|
|        Taiwan|        1104.0|
|         Macau|       20605.0|
|       Belgium|  1.54734075E8|
|          Peru|  3.58489616E8|
|         China|           0.0|
|         India| 3.221090185E9|
|         Chile|  2.31581173E8|
|         Italy|  6.27754296E8|
|         Spain|     6.40132E8|
|       Denmark|      212226.0|
|            US| 6.049145667E9|
|     Hong Kong|     2655935.0|
|       Ukraine|  3.10226075E8|
|        Israel|           5.0|
|        Mexico|  4.59557279E8|
+--------------+--------------+
only showing top 20 rows



In [None]:
df = df.withColumn("MortalityRate", (col("Deaths") / col("Confirmed")) * 100)
df.select("Country/Region", "Confirmed", "Deaths", "MortalityRate").show()

+--------------+---------+------+------------------+
|Country/Region|Confirmed|Deaths|     MortalityRate|
+--------------+---------+------+------------------+
|Mainland China|      1.0|   0.0|               0.0|
|Mainland China|     14.0|   0.0|               0.0|
|Mainland China|      6.0|   0.0|               0.0|
|Mainland China|      1.0|   0.0|               0.0|
|Mainland China|      0.0|   0.0|              NULL|
|Mainland China|     26.0|   0.0|               0.0|
|Mainland China|      2.0|   0.0|               0.0|
|Mainland China|      1.0|   0.0|               0.0|
|Mainland China|      4.0|   0.0|               0.0|
|Mainland China|      1.0|   0.0|               0.0|
|Mainland China|      0.0|   0.0|              NULL|
|Mainland China|      5.0|   0.0|               0.0|
|     Hong Kong|      0.0|   0.0|              NULL|
|Mainland China|    444.0|  17.0|3.8288288288288284|
|Mainland China|      4.0|   0.0|               0.0|
|Mainland China|      0.0|   0.0|             

In [None]:
highest_mortality = df.orderBy(desc("MortalityRate")).limit(10)
highest_mortality.show()

+------+---------------+--------------+--------------+-------------------+---------+-------+---------+------------------+
|   SNo|ObservationDate|Province/State|Country/Region|        Last Update|Confirmed| Deaths|Recovered|     MortalityRate|
+------+---------------+--------------+--------------+-------------------+---------+-------+---------+------------------+
|196218|     01/05/2021|       Unknown|       Belgium|2021-04-02 15:13:53|  10786.0|19827.0|      0.0|183.82162061932135|
|195454|     01/04/2021|       Unknown|       Belgium|2021-04-02 15:13:53|  10752.0|19750.0|      0.0|183.68675595238096|
|196982|     01/06/2021|       Unknown|       Belgium|2021-04-02 15:13:53|  10834.0|19883.0|      0.0|   183.52409082518|
|194690|     01/03/2021|       Unknown|       Belgium|2021-04-02 15:13:53|  10737.0|19701.0|      0.0| 183.4870075440067|
|190875|     12/29/2020|       Unknown|       Belgium|2021-04-02 15:13:53|  10553.0|19361.0|      0.0|183.46441770112764|
|193927|     01/02/2021|

In [None]:
df = df.withColumn("Date", to_date(col("ObservationDate"), "MM/dd/yyyy"))
df = df.orderBy("Country/Region", "Date")

In [None]:
from pyspark.sql.window import Window

In [None]:
window_spec = Window.partitionBy("Country/Region").orderBy("Date")
df = df.withColumn("DailyNewCases", col("Confirmed") - lag(col("Confirmed"), 1, 0).over(window_spec))

In [None]:
df.select("Country/Region", "Date", "Confirmed", "DailyNewCases").show()

+--------------+----------+---------+-------------+
|Country/Region|      Date|Confirmed|DailyNewCases|
+--------------+----------+---------+-------------+
|     Australia|2020-01-27|      4.0|          4.0|
|     Australia|2020-01-27|      1.0|         -3.0|
|     Australia|2020-01-28|      4.0|          3.0|
|     Australia|2020-01-28|      1.0|         -3.0|
|     Australia|2020-01-29|      4.0|          3.0|
|     Australia|2020-01-29|      1.0|         -3.0|
|     Australia|2020-01-30|      4.0|          3.0|
|     Australia|2020-01-30|      2.0|         -2.0|
|     Australia|2020-01-30|      3.0|          1.0|
|     Australia|2020-01-31|      4.0|          1.0|
|     Australia|2020-01-31|      3.0|         -1.0|
|     Australia|2020-01-31|      2.0|         -1.0|
|     Australia|2020-02-01|      4.0|          2.0|
|     Australia|2020-02-01|      4.0|          0.0|
|     Australia|2020-02-01|      3.0|         -1.0|
|     Australia|2020-02-01|      1.0|         -2.0|
|     Austra

In [None]:
spark.stop()