In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, max, month, sum

# Create a Spark session
spark = SparkSession.builder.appName("COVID Data Analysis").getOrCreate()

# Load the CSV file into a DataFrame
df = spark.read.csv("complete.csv", header=True, inferSchema=True)

# Show the first few rows
df.show()


+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            Kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            Kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

In [2]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Total Confirmed cases: double (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: double (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)



In [3]:
# Convert the "Name of State / UT" column to lowercase for consistency
df = df.withColumn("Name of State / UT", lower(col("Name of State / UT")))

# Show the distinct state/UT names in the updated DataFrame
distinct_states = df.select("Name of State / UT").distinct()
distinct_states.show()


+--------------------+
|  Name of State / UT|
+--------------------+
|               delhi|
|         maharashtra|
|           meghalaya|
|              odisha|
|             haryana|
|         west bengal|
|                 goa|
|              punjab|
|   jammu and kashmir|
|dadra and nagar h...|
|           karnataka|
|      andhra pradesh|
|           telangana|
|            nagaland|
|               bihar|
|      madhya pradesh|
|           jharkhand|
|               assam|
|              kerala|
|          tamil nadu|
+--------------------+
only showing top 20 rows



In [17]:
# 1. Day with the greatest number of COVID cases
max_cases_day = df.groupBy("Date") \
    .agg(sum("New cases").alias("Total Confirmed")) \
    .orderBy(col("Total Confirmed").desc()) \
    .first()

max_cases_date = max_cases_day["Date"]
max_cases_count = max_cases_day["Total Confirmed"]

print(f"The day with the greatest number of COVID cases: {max_cases_date} with {max_cases_count} cases.")


The day with the greatest number of COVID cases: 2020-07-18 with 70962 cases.


In [19]:
# 2. State with the second-largest number of COVID cases
state_cases = df.groupBy("Name of State / UT") \
    .agg(sum("New cases").alias("Total Confirmed")) \
    .orderBy(col("Total Confirmed").desc())

second_largest_state = state_cases.collect()[1]  # Index 1 gives the second-largest

print(f"The state with the second-largest number of COVID cases: {second_largest_state['Name of State / UT']} with {second_largest_state['Total Confirmed']} cases.")


The state with the second-largest number of COVID cases: tamil nadu with 273459 cases.


In [21]:
# 3. Union Territory with the least number of deaths
unique_states = [row[0] for row in distinct_states.collect()]

union_territory_deaths = df.filter(col("Name of State / UT").isin(unique_states)) \
    .groupBy("Name of State / UT") \
    .agg(sum("Death").alias("Total Deaths")) \
    .orderBy(col("Total Deaths").asc()) \
    .first()

print(f"The Union Territory with the least number of deaths: {union_territory_deaths['Name of State / UT']} with {union_territory_deaths['Total Deaths']} deaths.")


The Union Territory with the least number of deaths: union territory of ladakh with 0.0 deaths.


In [23]:
# 4. State with the lowest Death to Total Confirmed cases ratio
df_with_ratio = df.withColumn("Death_to_Confirmed_Ratio", col("Death") / col("Total Confirmed cases"))

lowest_ratio_state = df_with_ratio.groupBy("Name of State / UT") \
    .agg(max("Death_to_Confirmed_Ratio").alias("Max Ratio")) \
    .orderBy(col("Max Ratio").asc()) \
    .first()

print(f"The state with the lowest Death to Total Confirmed cases ratio: {lowest_ratio_state['Name of State / UT']} with a ratio of {lowest_ratio_state['Max Ratio']}.")


The state with the lowest Death to Total Confirmed cases ratio: union territory of ladakh with a ratio of 0.0.


In [25]:
# 5. Month with the most new recovered cases
df_with_month = df.withColumn("Month", month(col("Date")))

max_recovered_month = df_with_month.groupBy("Month") \
    .agg(sum("New recovered").alias("Total Recovered")) \
    .orderBy(col("Total Recovered").desc()) \
    .first()

month_mapping = {
    1: "January", 2: "February", 3: "March", 4: "April", 5: "May",
    6: "June", 7: "July", 8: "August", 9: "September", 10: "October",
    11: "November", 12: "December"
}

max_recovered_month_name = month_mapping[max_recovered_month["Month"]]
print(f"The month with the most new recovered cases: {max_recovered_month_name} with {max_recovered_month['Total Recovered']} recoveries.")

The month with the most new recovered cases: July with 722983 recoveries.
