In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("PySparkAssignment") \
    .getOrCreate()

In [3]:
df = spark.read.option("header",True) \
    .option("inferSchema",True) \
    .csv("CovidData.csv")

df.show(5)

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|30-01-2020|            Kerala| 10.8505|  76.2711|                    1|    0|                        0|        0|         0|            0|
|31-01-2020|            Kerala| 10.8505|  76.2711|                    1|    0|                        0|        0|         0|            0|
|01-02-2020|            Kerala| 10.8505|  76.2711|                    2|    0|                        0|        1|         0|            0|
|02-02-2020|            Kerala| 10.8505|  76.2711|                    3|    0|                        0|        1|         0|            0|
|03-02-2020|        

In [4]:
# df.select('Name of State / UT').collect() 

### 1. Convert all state names to lower case

In [5]:
from pyspark.sql.functions import col, lower

df_states_lower = df.withColumn("Name of State / UT", lower(col("Name of State / UT")))
df_states_lower.show(5)

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|30-01-2020|            kerala| 10.8505|  76.2711|                    1|    0|                        0|        0|         0|            0|
|31-01-2020|            kerala| 10.8505|  76.2711|                    1|    0|                        0|        0|         0|            0|
|01-02-2020|            kerala| 10.8505|  76.2711|                    2|    0|                        0|        1|         0|            0|
|02-02-2020|            kerala| 10.8505|  76.2711|                    3|    0|                        0|        1|         0|            0|
|03-02-2020|        

### 2. Day with highest number of covid cases

In [6]:
from pyspark.sql.functions import max
df.select(max(df["Total Confirmed cases"])).show()

+--------------------------+
|max(Total Confirmed cases)|
+--------------------------+
|                    468265|
+--------------------------+



In [7]:
df.select(max(df["Total Confirmed cases"])).collect()[0][0]

468265

In [8]:
from pyspark.sql.functions import col, max

max_cases_df = df.filter(col("Total Confirmed cases") == df.select(max(df["Total Confirmed cases"])).collect()[0][0])
date_df = max_cases_df.select(df["Date"].alias("Day with Highest No of covid cases")).show()
date_df

+----------------------------------+
|Day with Highest No of covid cases|
+----------------------------------+
|                        06-08-2020|
+----------------------------------+



### 3. State with second largest number of covid cases

In [9]:
max_cases_df = df.orderBy(col("Total Confirmed cases").desc()).limit(2)
# max_cases_df.show()
max_cases_df.select(max_cases_df["Name of State / UT"]).collect()[1][0]

'Maharashtra'

### 4. Union Territory with least number of deaths

In [10]:
# Filter UT records
union_territories = ["Andaman and Nicobar Islands", "Chandigarh", "Dadra and Nagar Haveli and Daman and Diu", "Jammu and Kashmir", "Ladakh", "Puducherry", "Lakshadweep"]
union_territory_records = df.filter(df["Name of State / UT"].isin(union_territories))
union_territory_records.show()

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|18-03-2020|        Puducherry| 11.9416|  79.8083|                    1|    0|                        0|        0|         0|            0|
|19-03-2020|        Puducherry| 11.9416|  79.8083|                    1|    0|                        0|        0|         0|            0|
|20-03-2020|        Puducherry| 11.9416|  79.8083|                    1|    0|                        0|        0|         0|            0|
|21-03-2020|        Chandigarh| 30.7333|  76.7794|                    1|    0|                        0|        0|         0|            0|
|21-03-2020| Jammu a

In [11]:
# Convert date column from string to date format 
from pyspark.sql.functions import to_date, max, col

UT_formatted_date_df = union_territory_records.withColumn("Date", to_date(col("Date"), "dd-MM-yyyy"))
latest_recorded_date = UT_formatted_date_df.select(max(col("Date"))).collect()[0][0]
latest_recorded_date

datetime.date(2020, 8, 6)

In [12]:
# UT_recent_records = UT_formatted_date_df.groupby("Name of State / UT").agg(max("Date"))
# UT_recent_records.show()

In [13]:
# Get Latest UT records 
UT_recent_records = UT_formatted_date_df.filter(col("Date") == latest_recorded_date)
UT_recent_records.show()

+----------+--------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|  Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+--------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-08-06|Andaman and Nicob...| 11.7401|  92.6586|                 1027|   14|                      326|       99|         0|           49|
|2020-08-06|          Chandigarh| 30.7333|  76.7794|                 1270|   20|                      715|       64|         0|            0|
|2020-08-06|Dadra and Nagar H...| 20.1809|  73.0169|                 1366|    2|                      960|       41|         0|           41|
|2020-08-06|   Jammu and Kashmir| 33.7782|  76.5762|                22955|  426|                    15244|      559|         0|          388|
|2020-

In [14]:
from pyspark.sql.types import IntegerType

# Convert Death column to int type
UT_recent_records = UT_recent_records.withColumn("Death", col("Death").cast(IntegerType()))
UT_recent_records.schema["Death"].dataType

IntegerType()

In [15]:
# # Group UTs and Get death sum
# sum_of_deaths_in_union_territories = union_territory_records.groupBy("Name of State / UT").sum("Death")
# sum_of_deaths_in_union_territories.show()

In [16]:
# from pyspark.sql.functions import col, min

# least_death_UT = sum_of_deaths_in_union_territories.filter(col("sum(Death)") == sum_of_deaths_in_union_territories.select(min(sum_of_deaths_in_union_territories["sum(Death)"])).collect()[0][0])
# least_death_UT.select(col("Name of State / UT").alias("Union Territory with least deaths")).show()

In [17]:
from pyspark.sql.functions import col, min

# Select UT with least deaths
least_death_UT = UT_recent_records.filter(col("Death") == UT_recent_records.select(min(UT_recent_records["Death"])).collect()[0][0])
least_death_UT.select(col("Name of State / UT").alias("Union Territory with least deaths")).show()

+---------------------------------+
|Union Territory with least deaths|
+---------------------------------+
|             Dadra and Nagar H...|
+---------------------------------+



### 5. State with Lowest Death to Total confirmed cases ratio

In [18]:
# UT_recent_records.select(min(UT_recent_records["Death"])).show()
Date_Formatted_df = df.withColumn("Date", to_date(col("Date"), "dd-MM-yyyy"))
latest_records = Date_Formatted_df.filter(col("Date") == latest_recorded_date)
latest_records.show()

+----------+--------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|  Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+--------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-08-06|Andaman and Nicob...| 11.7401|  92.6586|                 1027|   14|                      326|       99|         0|           49|
|2020-08-06|      Andhra Pradesh| 15.9129|    79.74|               186461| 1681|                   104354|    10128|         0|         8729|
|2020-08-06|   Arunachal Pradesh|  28.218|  94.7278|                 1855|    3|                     1210|       65|         0|          105|
|2020-08-06|               Assam| 26.2006|  92.9376|                50445|  121|                    35892|     2284|         0|         1471|
|2020-

In [19]:
fomatted_latest_records_df = latest_records.withColumn("Death", col("Death").cast(IntegerType()))
latest_with_ratio_df = fomatted_latest_records_df.withColumn("Death To Confirmed Ratio", (col("Death") / col("Total Confirmed cases")))
latest_with_ratio_df.show()

+----------+--------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+------------------------+
|      Date|  Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|Death To Confirmed Ratio|
+----------+--------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+------------------------+
|2020-08-06|Andaman and Nicob...| 11.7401|  92.6586|                 1027|   14|                      326|       99|         0|           49|    0.013631937682570594|
|2020-08-06|      Andhra Pradesh| 15.9129|    79.74|               186461| 1681|                   104354|    10128|         0|         8729|    0.009015290060656116|
|2020-08-06|   Arunachal Pradesh|  28.218|  94.7278|                 1855|    3|                     1210|       65|         0|          105|    0.001617250673854...

In [20]:
state_with_least_death_to_confirmed_ratio = latest_with_ratio_df.orderBy(col("Death To Confirmed Ratio").asc()).limit(1)
state_with_least_death_to_confirmed_ratio.show()

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+------------------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|Death To Confirmed Ratio|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+------------------------+
|2020-08-06|           Mizoram| 23.1645|  92.9376|                  537|    0|                      286|       33|         0|            4|                     0.0|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+------------------------+



In [21]:
state_with_least_death_to_confirmed_ratio.select(col("Name of State / UT").alias("State with least death to confirmed Ratio")).show()

+-----------------------------------------+
|State with least death to confirmed Ratio|
+-----------------------------------------+
|                                  Mizoram|
+-----------------------------------------+



### 6. Month with more Newer Recovered cases

In [22]:
from pyspark.sql.functions import month, sum

# UT_recent_records = UT_formatted_date_df.groupby("Name of State / UT").agg(max("Date"))
groupedby_month_df = Date_Formatted_df.groupby(month(Date_Formatted_df["Date"])).sum("New recovered")
groupedby_month_df.show()

+-----------+------------------+
|month(Date)|sum(New recovered)|
+-----------+------------------+
|          1|                 0|
|          6|            247662|
|          3|               124|
|          5|             78659|
|          4|              8201|
|          8|            270531|
|          7|            722983|
|          2|                 0|
+-----------+------------------+



In [23]:
max_new_recovered = groupedby_month_df.select(max(col("sum(New recovered)"))).collect()[0][0]
max_new_recovered

722983

In [24]:
months_list = ["January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"]
max_new_recovered_record = groupedby_month_df.filter(col("sum(New recovered)") == max_new_recovered)
max_new_recovered_record.show()

+-----------+------------------+
|month(Date)|sum(New recovered)|
+-----------+------------------+
|          7|            722983|
+-----------+------------------+



In [25]:
max_new_recovered_month = max_new_recovered_record.select(col("month(Date)")).collect()[0][0]
months_list[max_new_recovered_month-1]

'July'