In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=dbdfb910327f4ae8b0fa4bfd9592f5d671edea5d145038a275dc3c090645e0c6
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import month, col, count, when, coalesce, min, max, sum, avg, variance, stddev_pop, to_date

In [53]:
spark = SparkSession.builder.appName("CoronaVirusAnalysis").getOrCreate()

In [4]:
corona_df = spark.read.csv("/content/Corona Virus Dataset.csv", header=True, inferSchema=True)

**1. Write a code to check NULL values**

In [5]:
for col_name in corona_df.columns:
    null_count = corona_df.filter(corona_df[col_name].isNull()).count()
    print(f"Column '{col_name}' has {null_count} null values.")

Column 'Province' has 0 null values.
Column 'Country/Region' has 0 null values.
Column 'Latitude' has 0 null values.
Column 'Longitude' has 0 null values.
Column 'Date' has 0 null values.
Column 'Confirmed' has 0 null values.
Column 'Deaths' has 0 null values.
Column 'Recovered' has 0 null values.


**2. If NULL values are present, update them with zeros for all columns.**

In [6]:
corona_df = corona_df.fillna(0)

**3. Check total number of rows**

In [7]:
total_rows = corona_df.count()
print("Total number of rows:", total_rows)

Total number of rows: 78386


**4. Check what is start_date and end_date**

In [8]:
start_date, end_date = corona_df.select(max("Date"), min("Date")).first()
print("Start date:", start_date)
print("End date:", end_date)


Start date: 31-12-2020
End date: 01-01-2021


**5. Number of months present in dataset**

In [9]:
num_months = corona_df.selectExpr("count(distinct substr(Date, 1, 7)) as num_months").first()["num_months"]
print("Number of months present in dataset:", num_months)

Number of months present in dataset: 366


**6. Find monthly average for confirmed, deaths, recovered**

In [52]:
corona_df = corona_df.withColumn("Date", to_date(col("Date"), "dd-MM-yyyy"))

corona_df1 = corona_df.withColumn("Month", month("Date"))

monthly_avg = corona_df1.groupBy("Month").agg(avg("Confirmed").alias("AvgConfirmed"),
                                             avg("Deaths").alias("AvgDeaths"),
                                             avg("Recovered").alias("AvgRecovered")).orderBy("Month")

monthly_avg.show()

+-----+------------------+------------------+------------------+
|Month|      AvgConfirmed|         AvgDeaths|      AvgRecovered|
+-----+------------------+------------------+------------------+
|    1| 2958.281438074121|63.681184668989545|1451.4554957237883|
|    2| 1203.118705855548| 34.27773980405559| 769.1034404192299|
|    3|1538.9637620444073| 33.93024717218265| 840.0799120234605|
|    4| 2602.577813852814| 59.98051948051948|1623.2136363636364|
|    5| 2290.051948051948|  53.5305823209049|2162.9020737327187|
|    6|1357.8852310480218|40.835699184536395|1220.1532769556025|
|    7|1432.3611227482195|35.109551738583995| 983.0582320904902|
|    8|1611.8428990364475|  37.5366568914956|1299.2947214076246|
|    9| 1784.587445887446|34.777272727272724|1438.9067099567098|
|   10|2412.1996229576876| 36.75827398408043|1420.6430666108085|
|   11|3592.1943722943724|56.763419913419916|1985.3445887445887|
|   12|4050.4396732299956| 71.21826560536238|2497.8850020946793|
+-----+------------------

**7. Find most frequent value for confirmed, deaths, recovered each month**

In [51]:
from pyspark.sql.functions import col, month, approx_count_distinct
corona_df = corona_df.withColumn("Month", month("Date"))


monthly_mode = corona_df.groupBy("Month").agg(approx_count_distinct("Confirmed").alias("Frequent_Confirmed"),
                                              approx_count_distinct("Deaths").alias("Frequent_Deaths"),
                                              approx_count_distinct("Recovered").alias("Frequent_Recovered")).orderBy("Month")

monthly_mode.show()

+-----+------------------+---------------+------------------+
|Month|Frequent_Confirmed|Frequent_Deaths|Frequent_Recovered|
+-----+------------------+---------------+------------------+
|    1|              2281|            530|              1840|
|    2|              1855|            476|              1623|
|    3|              2144|            489|              1671|
|    4|              2461|            668|              1824|
|    5|              2472|            580|              2171|
|    6|              1756|            410|              1572|
|    7|              1465|            355|              1133|
|    8|              1513|            351|              1293|
|    9|              1596|            347|              1280|
|   10|              1887|            341|              1396|
|   11|              1928|            447|              1607|
|   12|              1988|            505|              1780|
+-----+------------------+---------------+------------------+



**8. Find minimum values for confirmed, deaths, recovered per year**

In [28]:
from pyspark.sql.functions import year
corona_df = corona_df.withColumn("Year", year("Date"))

yearly_min = corona_df.groupBy("Year").agg(min("Confirmed").alias("MinConfirmed"),
                                           min("Deaths").alias("MinDeaths"),
                                           min("Recovered").alias("MinRecovered"))

yearly_min.show()

+----+------------+---------+------------+
|Year|MinConfirmed|MinDeaths|MinRecovered|
+----+------------+---------+------------+
|2020|           0|        0|           0|
|2021|           0|        0|           0|
+----+------------+---------+------------+



**9. Find maximum values of confirmed, deaths, recovered per year**

In [29]:
yearly_max = corona_df.groupBy("Year").agg(max("Confirmed").alias("MaxConfirmed"),
                                           max("Deaths").alias("MaxDeaths"),
                                           max("Recovered").alias("MaxRecovered"))
yearly_max.show()

+----+------------+---------+------------+
|Year|MaxConfirmed|MaxDeaths|MaxRecovered|
+----+------------+---------+------------+
|2020|      823225|     3752|     1123456|
|2021|      414188|     7374|      422436|
+----+------------+---------+------------+



**10. The total number of cases of confirmed, deaths, recovered each month**

In [50]:

monthly_total = corona_df.groupBy("Month").agg(sum("Confirmed").alias("TotalConfirmed"),
                                              sum("Deaths").alias("TotalDeaths"),
                                              sum("Recovered").alias("TotalRecovered")).orderBy("Month")
monthly_total.show()

+-----+--------------+-----------+--------------+
|Month|TotalConfirmed|TotalDeaths|TotalRecovered|
+-----+--------------+-----------+--------------+
|    1|      18678589|     402083|       9164490|
|    2|      10560976|     300890|       6751190|
|    3|      14694026|     323966|       8021083|
|    4|      24047819|     554220|      14998494|
|    5|      21865416|     511110|      20651389|
|    6|       8991916|     270414|       8079855|
|    7|       6838092|     167613|       4693120|
|    8|       7694938|     179200|       6202833|
|    9|       8244794|     160671|       6647749|
|   10|      11515841|     175484|       6782150|
|   11|      16595938|     262247|       9172292|
|   12|      19336799|     339996|      11924903|
+-----+--------------+-----------+--------------+



**11. Check how corona virus spread out with respect to confirmed case**

In [31]:

spread_confirmed = corona_df.agg(sum("Confirmed").alias("TotalConfirmed"),
                                 avg("Confirmed").alias("AverageConfirmed"),
                                 variance("Confirmed").alias("VarianceConfirmed"),
                                 stddev_pop("Confirmed").alias("StdevConfirmed"))
spread_confirmed.show()

+--------------+------------------+-------------------+------------------+
|TotalConfirmed|  AverageConfirmed|  VarianceConfirmed|    StdevConfirmed|
+--------------+------------------+-------------------+------------------+
|     169065144|2156.8283111780165|1.572909316981742E8|12541.488152446873|
+--------------+------------------+-------------------+------------------+



**12. Check how corona virus spread out with respect to death case per month**

In [49]:

spread_death = corona_df.groupBy("Month").agg(sum("Deaths").alias("TotalDeaths"),
                                              avg("Deaths").alias("AverageDeaths"),
                                              variance("Deaths").alias("VarianceDeaths"),
                                              stddev_pop("Deaths").alias("StdevDeaths")).orderBy("Month")
spread_death.show()

+-----+-----------+------------------+------------------+------------------+
|Month|TotalDeaths|     AverageDeaths|    VarianceDeaths|       StdevDeaths|
+-----+-----------+------------------+------------------+------------------+
|    1|     402083|63.681184668989545| 79012.04454692524| 281.0685517110834|
|    2|     300890| 34.27773980405559|    34852.61830584|186.67792546229236|
|    3|     323966| 33.93024717218265|29785.052429518862|172.57442719754695|
|    4|     554220| 59.98051948051948| 67905.92472058737| 260.5735512183371|
|    5|     511110|  53.5305823209049| 76775.77941447152|277.06991605343194|
|    6|     270414|40.835699184536395|46250.187470278266|215.04232873365433|
|    7|     167613|35.109551738583995|21144.584057079526| 145.3965437841416|
|    8|     179200|  37.5366568914956| 23277.87242510872| 152.5548965351778|
|    9|     160671|34.777272727272724|20107.121414513156|141.78423475030922|
|   10|     175484| 36.75827398408043|17583.754252708517|132.58986016934787|

**13. Check how corona virus spread out with respect to recovered case**

In [33]:

spread_recovered = corona_df.agg(sum("Recovered").alias("TotalRecovered"),
                                 avg("Recovered").alias("AverageRecovered"),
                                 variance("Recovered").alias("VarianceRecovered"),
                                 stddev_pop("Recovered").alias("StdevRecovered"))
spread_recovered.show()

+--------------+------------------+--------------------+------------------+
|TotalRecovered|  AverageRecovered|   VarianceRecovered|    StdevRecovered|
+--------------+------------------+--------------------+------------------+
|     113089548|1442.7263541959023|1.0703088869603062E8|10345.507395110997|
+--------------+------------------+--------------------+------------------+



**14. Find Country having the highest number of confirmed cases**

In [34]:

highest_confirmed_country = corona_df.groupBy("Country/Region").agg(max("Confirmed").alias("MaxConfirmed")).orderBy(col("MaxConfirmed").desc()).limit(1)
highest_confirmed_country.show()

+--------------+------------+
|Country/Region|MaxConfirmed|
+--------------+------------+
|        Turkey|      823225|
+--------------+------------+



**15. Find Country having the lowest number of death cases**

In [35]:

lowest_death_country = corona_df.groupBy("Country/Region").agg(min("Deaths").alias("MinDeaths")).orderBy(col("MinDeaths")).limit(1)
lowest_death_country.show()

+--------------+---------+
|Country/Region|MinDeaths|
+--------------+---------+
|      Paraguay|        0|
+--------------+---------+



**16. Find top 5 countries having the highest recovered cases**

In [36]:

top_recovered_countries = corona_df.groupBy("Country/Region").agg(sum("Recovered").alias("TotalRecovered")).orderBy(col("TotalRecovered").desc()).limit(5)
top_recovered_countries.show()

+--------------+--------------+
|Country/Region|TotalRecovered|
+--------------+--------------+
|         India|      28089649|
|        Brazil|      15400169|
|            US|       6303715|
|        Turkey|       5202251|
|        Russia|       4745756|
+--------------+--------------+

