In [None]:
pip install pyspark

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types 
from pyspark.sql.window import Window

In [15]:
spark = SparkSession.builder.appName("PySparkCovidAnalysisTask").getOrCreate()
filepath = "complete.csv"

In [16]:
df = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace",True) \
            .option("ignoreTrailingWhiteSpace",True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load(filepath)

In [17]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)



In [18]:
df = df.withColumn("total_case", df["Total Confirmed cases"].cast(types.LongType()))
df = df.withColumn("total_newly_recovered", df["New recovered"].cast(types.LongType()))
df = df.withColumn("state", df["Name of State / UT"].cast(types.StringType()))
df = df.withColumn("death_Case", df["Death"].cast(types.LongType()))
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- total_case: long (nullable = true)
 |-- total_newly_recovered: long (nullable = true)
 |-- state: string (nullable = true)
 |-- death_Case: long (nullable = true)



In [19]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- total_case: long (nullable = true)
 |-- total_newly_recovered: long (nullable = true)
 |-- state: string (nullable = true)
 |-- death_Case: long (nullable = true)



In [20]:
# 1. Convert all state names to lower
output_df_state_names = df.withColumn('state_names_lower_case', F.lower(F.col("state")))
output_df_state_names.select("state_names_lower_case").distinct().show()

+----------------------+
|state_names_lower_case|
+----------------------+
|                 delhi|
|           maharashtra|
|             meghalaya|
|                odisha|
|               haryana|
|           west bengal|
|                   goa|
|                punjab|
|     jammu and kashmir|
|  dadra and nagar h...|
|             karnataka|
|        andhra pradesh|
|             telangana|
|              nagaland|
|                 bihar|
|        madhya pradesh|
|             jharkhand|
|                 assam|
|                kerala|
|            tamil nadu|
+----------------------+
only showing top 20 rows



In [22]:
# 2. The day with the highest number of COVID cases

# Group by date and sum the total cases, then find the day with the maximum cases in one step
df_max_covid_cases_day = df.groupBy("Date") \
    .agg(F.sum("total_case").alias("sum_total_case")) \
    .orderBy(F.col("sum_total_case").desc()) \
    .limit(1)

df_max_covid_cases_day.show()

+----------+--------------+
|      Date|sum_total_case|
+----------+--------------+
|2020-08-06|       1964536|
+----------+--------------+



In [24]:
# 3. The state with the second-largest number of COVID cases
df_second_largest_state_cases = df.groupBy("state") \
    .agg(F.sum("total_case").alias("sum_total_case")) \
    .orderBy(F.col("sum_total_case").desc()) \
    .limit(2) \
    .orderBy(F.col("sum_total_case").asc()) \
    .limit(1)

df_second_largest_state_cases.show()

+----------+--------------+
|     state|sum_total_case|
+----------+--------------+
|Tamil Nadu|       7847083|
+----------+--------------+



In [25]:
# 4. Union Territory with the least number of deaths
df_least_deaths_union_territory = df.filter(F.col("state").contains("Union Territory")) \
    .groupBy("state") \
    .agg(F.sum("death_Case").alias("total_deaths")) \
    .orderBy(F.col("total_deaths").asc()) \
    .limit(1)

df_least_deaths_union_territory.show(truncate=False)


+------------------------------------+------------+
|state                               |total_deaths|
+------------------------------------+------------+
|Union Territory of Jammu and Kashmir|0           |
+------------------------------------+------------+



In [26]:
# 5. State with the Lowest Death to Total Confirmed Cases Ratio
df_lowest_death_to_case_ratio = df.withColumn("death_to_case_ratio", F.col("death_Case") / F.col("total_case")) \
    .groupBy("state") \
    .agg(F.avg("death_to_case_ratio").alias("avg_death_to_case_ratio")) \
    .orderBy(F.col("avg_death_to_case_ratio").asc()) \
    .limit(1)

df_lowest_death_to_case_ratio.show(truncate=False)


+-------+-----------------------+
|state  |avg_death_to_case_ratio|
+-------+-----------------------+
|Mizoram|0.0                    |
+-------+-----------------------+



In [52]:
# # 6. The month that has more newer recovered cases
# # If the month is 02, it should display as February

# Define month mapping
month_mapping = {
    1: "January", 2: "February", 3: "March", 4: "April", 5: "May",
    6: "June", 7: "July", 8: "August", 9: "September", 10: "October",
    11: "November", 12: "December"
}

# Extract month from the Date column
df_with_month = df.withColumn("Month", F.month(F.col("Date")))

# Calculate total recovered cases per month and get the month with the highest total
max_recovered_month = (
    df_with_month
    .groupBy("Month")
    .agg(F.sum("New recovered").alias("Total Recovered"))
    .orderBy(F.col("Total Recovered").desc())
    .first()
)

# Get the month name from the mapping
max_month_name = month_mapping.get(max_recovered_month["Month"], "Unknown Month")
total_recovered = max_recovered_month["Total Recovered"]

# Print the result
print(f"The month with the most new recovered cases: {max_month_name} with {total_recovered} cases")


The month with the most new recovered cases: July with 722983.0 cases
