In [1]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [50]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types 
from pyspark.sql.window import Window
import calendar
import datetime

In [44]:
spark = SparkSession.builder.appName("dataTrainingCovidTask").config("spark.network.timeout", "800s").config("spark.executor.heartbeatInterval", "60s").getOrCreate()

In [45]:
filepath = "covid-data.csv"
data = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace",True) \
            .option("ignoreTrailingWhiteSpace",True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load(filepath)

In [46]:
data.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)



# 1. Convert all state names to lowercase

In [23]:
lowercase_states = data.withColumn("state_lowercase", F.lower(F.col("Name of State / UT")))
lowercase_states.select("state_lowercase").distinct().show()

+--------------------+
|     state_lowercase|
+--------------------+
|               delhi|
|         maharashtra|
|           meghalaya|
|              odisha|
|             haryana|
|         west bengal|
|                 goa|
|              punjab|
|   jammu and kashmir|
|dadra and nagar h...|
|           karnataka|
|      andhra pradesh|
|           telangana|
|            nagaland|
|               bihar|
|      madhya pradesh|
|           jharkhand|
|               assam|
|              kerala|
|          tamil nadu|
+--------------------+
only showing top 20 rows



# 2. Find the day with the greatest number of covid cases

In [25]:
daily_cases = data.groupBy("Date").agg(F.sum("Total Confirmed cases").alias("total_cases_sum"))
window_daily_cases = Window.orderBy(F.col("total_cases_sum").desc())
most_cases_day = daily_cases.withColumn("rank", F.row_number().over(window_daily_cases)).filter(F.col('rank') == 1).drop('rank')

most_cases_day.show()

+----------+---------------+
|      Date|total_cases_sum|
+----------+---------------+
|2020-08-06|      1964536.0|
+----------+---------------+



# 3. Find the state with the second-largest number of COVID cases

In [28]:
state_cases = data.groupBy("Name of State / UT").agg(F.sum("Total Confirmed cases").alias("total_cases_sum"))
window_state_cases = Window.orderBy(F.col("total_cases_sum").desc())
second_largest_state = state_cases.withColumn("rank", F.row_number().over(window_state_cases)).filter(F.col('rank') == 2).drop('rank')
second_largest_state.show()

+------------------+---------------+
|Name of State / UT|total_cases_sum|
+------------------+---------------+
|        Tamil Nadu|      7847083.0|
+------------------+---------------+



# 4. Find which Union Territory has the least number of deaths

In [32]:
union_territories = data.filter(F.col("Name of State / UT").like("Union Territory%"))
ut_deaths = union_territories.groupBy("Name of State / UT").agg(F.sum("Death").alias("total_deaths"))
window_ut_deaths = Window.orderBy(F.col("total_deaths"))
least_deaths_ut = ut_deaths.withColumn("rank", F.row_number().over(window_ut_deaths)).filter(F.col('rank') == 1).drop('rank')
least_deaths_ut.show(truncate=False)

+------------------------------------+------------+
|Name of State / UT                  |total_deaths|
+------------------------------------+------------+
|Union Territory of Jammu and Kashmir|0.0         |
+------------------------------------+------------+



# 5. Find the state with the lowest Death to Total Confirmed Cases ratio

In [33]:
death_case_ratio = data.withColumn("death_to_case_ratio", F.col("Death") / F.col("Total Confirmed cases"))
state_ratio = death_case_ratio.groupBy("Name of State / UT").agg(F.avg("death_to_case_ratio").alias("avg_ratio"))
window_ratio = Window.orderBy(F.col("avg_ratio"))
lowest_ratio_state = state_ratio.withColumn("rank", F.row_number().over(window_ratio)).filter(F.col('rank') == 1).drop('rank')
lowest_ratio_state.show(truncate=False)

+------------------+---------+
|Name of State / UT|avg_ratio|
+------------------+---------+
|Mizoram           |0.0      |
+------------------+---------+



# 6. Find the month with the most newly recovered cases

In [74]:
df_with_month = data.withColumn("Month", F.date_format(F.col("Date"), "MMMM"))

monthly_recovered = (
    df_with_month
    .groupBy("Month")
    .agg(F.sum("New recovered").alias("sum_newly_recovered"))
    .orderBy(F.col("sum_newly_recovered").desc())
)
max_recovered_row = monthly_recovered.limit(1)
max_recovered_row.show(truncate=False)

+-----+-------------------+
|Month|sum_newly_recovered|
+-----+-------------------+
|July |722983.0           |
+-----+-------------------+

