In [1]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, max, min, count, expr, month, date_format, lit

# Create a Spark session
spark = SparkSession.builder.master("local[*]").appName("Covid Data Analysis").getOrCreate()


In [2]:

# Load the CSV file into a DataFrame
file_path = 'complete.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)


In [7]:

# Convert all state names to lowercase
df_lowercase = df.withColumn("Name of State / UT", lower(col("Name of State / UT")))
df_lowercase.show(5)


+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

In [9]:

# Day with the greatest number of covid cases
day_greatest_cases = df_lowercase.groupBy("Date").agg(max("Total Confirmed cases").alias("max_cases")).orderBy(col("max_cases").desc()).limit(1)
day_greatest_cases.show()


+----------+---------+
|      Date|max_cases|
+----------+---------+
|2020-08-06| 468265.0|
+----------+---------+



In [10]:

# State with the second-largest number of covid cases
state_second_largest = df_lowercase.groupBy("Name of State / UT").agg(max("Total Confirmed cases").alias("max_cases")).orderBy(col("max_cases").desc()).limit(2)
state_second_largest.show()


+------------------+---------+
|Name of State / UT|max_cases|
+------------------+---------+
|       maharashtra| 468265.0|
|        tamil nadu| 273460.0|
+------------------+---------+



In [11]:
states_of_india = [
    "Andhra Pradesh",
    "Arunachal Pradesh",
    "Assam",
    "Bihar",
    "Chhattisgarh",
    "Goa",
    "Gujarat",
    "Haryana",
    "Himachal Pradesh",
    "Jharkhand",
    "Karnataka",
    "Kerala",
    "Madhya Pradesh",
    "Maharashtra",
    "Manipur",
    "Meghalaya",
    "Mizoram",
    "Nagaland",
    "Odisha",
    "Punjab",
    "Rajasthan",
    "Sikkim",
    "Tamil Nadu",
    "Telangana",
    "Tripura",
    "Uttar Pradesh",
    "Uttarakhand",
    "West Bengal"
]


In [15]:
union_territories_of_india = [
    "Andaman and Nicobar Islands",
    "Chandigarh",
    "Dadra and Nagar Haveli and Daman and Diu",
    "Lakshadweep",
    "Delhi",
    "Puducherry",
    "Ladakh",
    "Jammu and Kashmir"
]


In [19]:

# Union Territory with the least number of deaths

union_territory_least_deaths = df.filter(col("Name of State / UT").isin(union_territories_of_india)).groupBy("Name of State / UT").agg(min("Death").alias("min_deaths"))    .orderBy(col("min_deaths").asc())
union_territory_least_deaths.show()


+--------------------+----------+
|  Name of State / UT|min_deaths|
+--------------------+----------+
|Andaman and Nicob...|         0|
|          Chandigarh|         0|
|Dadra and Nagar H...|         0|
|               Delhi|         0|
|   Jammu and Kashmir|         0|
|              Ladakh|         0|
|          Puducherry|         0|
+--------------------+----------+



In [20]:

# State with the lowest Death to Total Confirmed cases ratio
state_lowest_death_ratio = df_lowercase.withColumn("death_ratio", col("Death") / col("Total Confirmed cases"))    .groupBy("Name of State / UT").agg(min("death_ratio").alias("min_death_ratio"))    .orderBy(col("min_death_ratio").asc()).limit(1)
state_lowest_death_ratio.show()
 

+------------------+---------------+
|Name of State / UT|min_death_ratio|
+------------------+---------------+
|             delhi|            0.0|
+------------------+---------------+



In [35]:

# Find the month with the most recent (newer) recovered cases and display as month name
month_newer_recovered = df_lowercase.groupBy(month("Date").alias("month")).agg(max("New recovered").alias("max_recovered")).orderBy(col("max_recovered").desc()).limit(1)

month_newer_recovered.show()
months = [
        "January", "February", "March", "April", "May", "June",
        "July", "August", "September", "October", "November", "December"
    ]
months[month_newer_recovered.select("*").collect()[0][0]-1]

+-----+-------------+
|month|max_recovered|
+-----+-------------+
|    7|        13401|
+-----+-------------+



'July'

In [None]:

# Stopping the Spark session
spark.stop()
