In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=e8950fed2b777a1b43f134ec4fdbfcbf397a60c8b341c48d55488421a293797d
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, sum, max, min, expr, month, date_format
from pyspark.sql.window import Window

# Initialize Spark session

In [5]:
spark = SparkSession.builder.appName("CovidDataAnalysis").getOrCreate()

# Load the CSV file into a DataFrame


In [6]:
df = spark.read.csv("/content/complete.csv", header=True, inferSchema=True)

In [7]:
df.show(5)

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            Kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            Kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

In [8]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Total Confirmed cases: double (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: double (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)



# 1. Convert all state names to lowercase

In [13]:

df = df.withColumn("Name of State / UT", lower(col("Name of State / UT")))

In [12]:
df.show(5)

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

# 2. The day with the greatest number of COVID cases
We sum the New cases per day and order by the total to find the day with the maximum new cases.

In [15]:
max_cases_day = df.groupBy("Date").agg(sum("New cases").alias("Total_New_Cases"))\
                  .orderBy(col("Total_New_Cases").desc()).first()

# 3. Find the state with the second-largest number of COVID cases (Total Confirmed cases)
We sum the Total Confirmed cases per state and find the second-highest total by ordering the results and accessing the second element.

In [16]:
state_cases = df.groupBy("Name of State / UT").agg(sum("Total Confirmed cases").alias("Total_Cases"))
second_largest_state = state_cases.orderBy(col("Total_Cases").desc()).collect()[1]

# 4. Find the Union Territory with the least number of deaths (Death)
We filter the data to include only union territories and sum the Death column, then find the one with the minimum deaths.


In [17]:
union_territory_deaths = df.filter(df["Name of State / UT"].isin(["delhi", "puducherry", "lakshadweep", "ladakh", "daman and diu", "dadra and nagar haveli", "chandigarh", "andaman and nicobar islands"]))\
                           .groupBy("Name of State / UT").agg(sum("Death").alias("Total_Deaths"))
least_deaths_ut = union_territory_deaths.orderBy(col("Total_Deaths").asc()).first()


# 5. Find the state with the lowest Death to Total Confirmed cases ratio
We calculate the ratio of deaths to confirmed cases for each state and find the state with the lowest ratio.

In [18]:
state_death_ratio = df.groupBy("Name of State / UT").agg(
    (sum("Death") / sum("Total Confirmed cases")).alias("Death_Confirmed_Ratio")
)
lowest_death_ratio_state = state_death_ratio.orderBy(col("Death_Confirmed_Ratio").asc()).first()


# 6. Find which month has the most newer recovered cases

We extract the month from the Date, sum the New recovered cases per month, and identify the month with the highest total recoveries.

In [19]:
monthly_recovered_cases = df.withColumn("Month", date_format(col("Date"), "MMMM"))\
                            .groupBy("Month").agg(sum("New recovered").alias("Total_Recovered"))
most_recovered_month = monthly_recovered_cases.orderBy(col("Total_Recovered").desc()).first()

# Display Results

In [20]:
print(f"Day with greatest number of COVID cases: {max_cases_day['Date']}")
print(f"State with second-largest number of COVID cases: {second_largest_state['Name of State / UT']}")
print(f"Union Territory with least number of deaths: {least_deaths_ut['Name of State / UT']}")
print(f"State with lowest Death to Total Confirmed cases ratio: {lowest_death_ratio_state['Name of State / UT']}")
print(f"Month with the most newer recovered cases: {most_recovered_month['Month']}")

Day with greatest number of COVID cases: 2020-07-18
State with second-largest number of COVID cases: tamil nadu
Union Territory with least number of deaths: andaman and nicobar islands
State with lowest Death to Total Confirmed cases ratio: union territory of ladakh
Month with the most newer recovered cases: July


In [21]:
spark.stop()