In [2]:
from pyspark.sql import SparkSession

In [29]:
from pyspark.sql.functions import col, lower, max, min, count, sum, expr, month, when, upper,trim,month

In [4]:
spark = SparkSession.builder.appName("Covid Data Analysis").getOrCreate()

In [5]:
df = spark.read.csv("Downloads/complete.csv", header=True, inferSchema=True)

In [6]:
# Convert state names to lowercase
df = df.withColumn("State", lower(col("State")))

In [7]:
df.show()

+----------+------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date| State|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|30-01-2020|kerala| 10.8505|  76.2711|                    1|    0|                        0|        0|         0|            0|
|31-01-2020|kerala| 10.8505|  76.2711|                    1|    0|                        0|        0|         0|            0|
|01-02-2020|kerala| 10.8505|  76.2711|                    2|    0|                        0|        1|         0|            0|
|02-02-2020|kerala| 10.8505|  76.2711|                    3|    0|                        0|        1|         0|            0|
|03-02-2020|kerala| 10.8505|  76.2711|                    3|    0|                        0|        0|  

In [8]:
# Find the day with the greatest number of COVID cases
max_cases_day = df.orderBy(col("New cases").desc()).select("Date", "New cases").first()
print(f"The day with the greatest number of COVID cases is: {max_cases_day['Date']} with {max_cases_day['New cases']} cases.")

The day with the greatest number of COVID cases is: 26-07-2020 with 18366 cases.


In [9]:
# Find the state with the second-largest number of COVID cases
state_cases = df.groupBy("State").sum("Total Confirmed cases").orderBy(col("sum(Total Confirmed cases)").desc())
second_largest_state = state_cases.collect()[1]
print(f"State : {second_largest_state['State']} \nNo of Cases: {second_largest_state['sum(Total Confirmed cases)']} cases.")


State : tamil nadu 
No of Cases: 7847083 cases.


In [12]:
df = df.withColumn("Death", col("Death").cast("int"))

In [38]:
ut_least_deaths = df.filter(col("State").contains("union territory")).groupBy("State").sum("Death").orderBy(col("sum(Death)").asc()).first()
print(f"The Union Territory with the least number of deaths is: {ut_least_deaths['State']} with {ut_least_deaths['sum(Death)']} deaths.")

The Union Territory with the least number of deaths is: union territory of ladakh with 0 deaths.


In [26]:
df.select("Death").dtypes

[('Death', 'int')]

In [27]:
ut_least_deaths = df.filter(col("State").contains("union territory")).groupBy("State").sum("Death").orderBy(col("sum(Death)").asc()).first()
print(f"The Union Territory with the least number of deaths is: {ut_least_deaths['State']} with {ut_least_deaths['sum(Death)']} deaths.")

The Union Territory with the least number of deaths is: union territory of ladakh with 0 deaths.


In [28]:
# Calculate the Death to Total Confirmed cases ratio and find the state with the lowest ratio
df = df.withColumn("DeathToConfirmedRatio", col("Death") / col("Total Confirmed cases"))
lowest_ratio_state = df.orderBy(col("DeathToConfirmedRatio").asc()).select("State", "DeathToConfirmedRatio").first()
print(f"The state with the lowest Death to Total Confirmed cases ratio is: {lowest_ratio_state['State']} with a ratio of {lowest_ratio_state['DeathToConfirmedRatio']}.")


The state with the lowest Death to Total Confirmed cases ratio is: puducherry with a ratio of None.


In [37]:
from pyspark.sql.functions import month, col, to_date
import calendar

# Ensure the "Date" field is in date format
df = df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))

# Extract the month from the "Date" column
df = df.withColumn("Month", month(col("Date")))

# Find the month with the most new recovered cases
most_recovered_month = df.groupBy("Month").sum("New recovered").orderBy(col("sum(New recovered)").desc()).first()

# Convert the month number to the month name
if most_recovered_month and most_recovered_month["Month"] is not None:
    month_name = calendar.month_name[int(most_recovered_month["Month"])]
    print(f"The month with the most new recovered cases is: {month_name} with {most_recovered_month['sum(New recovered)']} recovered cases.")
else:
    print("No valid month data found.")


No valid month data found.
