In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types 
from pyspark.sql.window import Window

In [5]:
spark = SparkSession.builder.appName("covid").getOrCreate()
filepath = "complete.csv"

In [9]:
df_csv = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace",True) \
            .option("ignoreTrailingWhiteSpace",True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load(filepath)

In [7]:
df_csv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)



In [7]:
df_csv = df_csv.withColumn("total_case", df_csv["Total Confirmed cases"].cast(types.LongType()))
df_csv = df_csv.withColumn("total_newly_recovered", df_csv["New recovered"].cast(types.LongType()))
df_csv = df_csv.withColumn("state", df_csv["Name of State / UT"].cast(types.StringType()))
df_csv = df_csv.withColumn("death_Case", df_csv["Death"].cast(types.LongType()))
df_csv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- total_case: long (nullable = true)
 |-- total_newly_recovered: long (nullable = true)
 |-- state: string (nullable = true)
 |-- death_Case: long (nullable = true)



In [10]:
df_csv.show()

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            Kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            Kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

In [11]:
# First Task convert all the state to lower case
from pyspark.sql.functions import lower
output_df_csv = df_csv.withColumn("Name of State / UT", lower("Name of State / UT"))
output_df_csv.show()

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

In [44]:
# 2nd Get the day with most covid cases

from pyspark.sql import functions as F
output_df_csv = df_csv.withColumn("Total Confirmed cases", df_csv["Total Confirmed cases"].cast("long"))
cases_by_day = output_df_csv.groupBy("Date").agg(F.sum("Total Confirmed cases").alias("Total_Cases_Per_Day"))
max_cases_day = cases_by_day.orderBy(F.desc("Total_Cases_Per_Day")).first()
print(f"The day with the highest number of cases is: {max_cases_day['Date']} with {max_cases_day['Total_Cases_Per_Day']} cases.")

The day with the highest number of cases is: 2020-08-06 with 1964536 cases.


In [45]:
# 3rd The state that has the second largest number of covid cases
output_df_csv = df_csv.withColumn("Total Confirmed cases", df_csv["Total Confirmed cases"].cast("long"))
output_df_1 = output_df_csv.groupBy("Name of State / UT").agg(F.sum("Total Confirmed cases").alias("sum_total_case"))
window_spec = Window.orderBy(F.col("sum_total_case").desc())
output_df_1 = output_df_1.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 2).drop('recency')
output_df_1.show()

+------------------+--------------+
|Name of State / UT|sum_total_case|
+------------------+--------------+
|        Tamil Nadu|       7847083|
+------------------+--------------+



In [47]:
# 4th to get the least covid cases in union territories 
output_df_csv = df_csv.withColumn("Death", df_csv["Death"].cast("long"))
df_ut = output_df_csv.filter(F.col("Name of State / UT").like("Union Territory%"))
death_by_ut = df_ut.groupBy("Name of State / UT").agg(F.sum("Death").alias("Total_Deaths_Per_UT"))
window_spec = Window.orderBy(F.col("Total_Deaths_Per_UT").asc())
least_deaths_ut = death_by_ut.withColumn("rank", F.row_number().over(window_spec)).filter(F.col('rank') == 1).drop('rank')
least_deaths_ut.show()


+--------------------+-------------------+
|  Name of State / UT|Total_Deaths_Per_UT|
+--------------------+-------------------+
|Union Territory o...|                  0|
+--------------------+-------------------+



In [12]:
# 5th state has the lowest death to the total confirmed cases ratio

output_df_csv = df_csv.withColumn("Death", df_csv["Death"].cast("long"))
output_df_csv = df_csv.withColumn("Total Confirmed cases", df_csv["Total Confirmed cases"].cast("long"))
state_stats = df_csv.groupBy("Name of State / UT").agg(
    F.sum("Death").alias("Total_Deaths"),
    F.sum("Total Confirmed cases").alias("Total_Confirmed_Cases")
)
state_stats = state_stats.withColumn("Death_to_Confirmed_Ratio", F.col("Total_Deaths") / F.col("Total_Confirmed_Cases"))
window_spec = Window.orderBy(F.col("Death_to_Confirmed_Ratio").asc())
lowest_ratio_state = state_stats.withColumn("rank", F.row_number().over(window_spec)).filter(F.col('rank') == 1).drop('rank')
lowest_ratio_state.show()


+------------------+------------+---------------------+------------------------+
|Name of State / UT|Total_Deaths|Total_Confirmed_Cases|Death_to_Confirmed_Ratio|
+------------------+------------+---------------------+------------------------+
|           Mizoram|         0.0|              13335.0|                     0.0|
+------------------+------------+---------------------+------------------------+



In [13]:
# 5th The state has the lowest death to Total confirmed case ratio
output_df_csv = df_csv.withColumn("Death", df_csv["Death"].cast("long"))
output_df_csv = df_csv.withColumn("Total Confirmed cases", df_csv["Total Confirmed cases"].cast("long"))
state_stats = df_csv.groupBy("Name of State / UT").agg(
    F.sum("Death").alias("Total_Deaths"),
    F.sum("Total Confirmed cases").alias("Total_Confirmed_Cases")
)
state_stats = state_stats.withColumn("Death_to_Confirmed_Ratio", F.col("Total_Deaths") / F.col("Total_Confirmed_Cases"))
window_spec = Window.orderBy(F.col("Death_to_Confirmed_Ratio").asc())
lowest_ratio_state = state_stats.withColumn("rank", F.row_number().over(window_spec)).filter(F.col('rank') == 1).drop('rank')
lowest_ratio_state.show()

+------------------+------------+---------------------+------------------------+
|Name of State / UT|Total_Deaths|Total_Confirmed_Cases|Death_to_Confirmed_Ratio|
+------------------+------------+---------------------+------------------------+
|           Mizoram|         0.0|              13335.0|                     0.0|
+------------------+------------+---------------------+------------------------+



In [14]:
# 6th To Find which month the Newer recovered cases.

df_recovered = df_csv.withColumn("New recovered", df_csv["New recovered"].cast("long"))
df_recovered = df_recovered.withColumn("Month", F.month(F.to_date(F.col("Date"), "yyyy-MM-dd")))
recovered_by_month = df_recovered.groupBy("Month").agg(
    F.sum("New recovered").alias("Total_New_Recovered")
)
window_spec = Window.orderBy(F.col("Total_New_Recovered").desc())
top_recovered_month = recovered_by_month.withColumn("rank", F.row_number().over(window_spec)).filter(F.col('rank') == 1)
month_names = {
    1: "January", 2: "February", 3: "March", 4: "April", 5: "May", 6: "June",
    7: "July", 8: "August", 9: "September", 10: "October", 11: "November", 12: "December"
}
top_recovered_month = top_recovered_month.withColumn(
    "Month_Name",
    F.expr("CASE Month " +
           "WHEN 1 THEN 'January' " +
           "WHEN 2 THEN 'February' " +
           "WHEN 3 THEN 'March' " +
           "WHEN 4 THEN 'April' " +
           "WHEN 5 THEN 'May' " +
           "WHEN 6 THEN 'June' " +
           "WHEN 7 THEN 'July' " +
           "WHEN 8 THEN 'August' " +
           "WHEN 9 THEN 'September' " +
           "WHEN 10 THEN 'October' " +
           "WHEN 11 THEN 'November' " +
           "WHEN 12 THEN 'December' " +
           "END"
    )
)
top_recovered_month = top_recovered_month.drop("rank")
top_recovered_month.show(truncate=False)


+-----+-------------------+----------+
|Month|Total_New_Recovered|Month_Name|
+-----+-------------------+----------+
|7    |722983             |July      |
+-----+-------------------+----------+

