# Spark Assignment

#### Import Packages

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, to_date, when, date_format, month
import pyspark.sql.functions as F
from pyspark.sql import types 
from pyspark.sql.window import Window

#### Import File

In [30]:
spark = SparkSession.builder.appName("covid").getOrCreate()
filepath = "/Users/raghavendiran/Desktop/Data/Presidio/PySpark/complete.csv"

#### Create DF

In [31]:
df_csv = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace",True) \
            .option("ignoreTrailingWhiteSpace",True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load(filepath)

In [32]:
df_csv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)



### Convert all state names to lowercase

In [33]:
lowercase_state_names = df_csv.withColumn('state', F.lower(F.col("Name of State / UT")))
lowercase_state_names.select("state").distinct().show()

+--------------------+
|               state|
+--------------------+
|               delhi|
|         maharashtra|
|           meghalaya|
|              odisha|
|             haryana|
|         west bengal|
|                 goa|
|              punjab|
|   jammu and kashmir|
|dadra and nagar h...|
|           karnataka|
|      andhra pradesh|
|           telangana|
|            nagaland|
|               bihar|
|      madhya pradesh|
|           jharkhand|
|               assam|
|              kerala|
|          tamil nadu|
+--------------------+
only showing top 20 rows



### The day had a greater number of covid cases

In [34]:
greater_covid_cases = df_csv.groupBy("Date").agg(F.sum("Total Confirmed cases").alias("sum_total_case"))
window_spec = Window.orderBy(F.col("sum_total_case").desc())
greater_covid_cases = greater_covid_cases.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')
greater_covid_cases.show()

+----------+--------------+
|      Date|sum_total_case|
+----------+--------------+
|2020-08-06|     1964536.0|
+----------+--------------+



### The state has the second-largest number of covid cases

In [35]:
second_largest_covid_cases = df_csv.groupBy("Name of State / UT").agg(F.sum("Total Confirmed cases").alias("sum_total_case"))
window_spec = Window.orderBy(F.col("sum_total_case").desc())
second_largest_covid_cases = second_largest_covid_cases.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 2).drop('recency')
second_largest_covid_cases.show()

+------------------+--------------+
|Name of State / UT|sum_total_case|
+------------------+--------------+
|        Tamil Nadu|     7847083.0|
+------------------+--------------+



### Which Union Territory has the least number of death

In [36]:
union_territory_least_death = df_csv.where(F.col('Name of State / UT').like("Union Territory%"))
union_territory_least_death = union_territory_least_death.groupBy("Name of State / UT").agg(F.sum("Death").alias("sum_total_death"))
window_spec = Window.orderBy(F.col("sum_total_death"))
union_territory_least_death = union_territory_least_death.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')
union_territory_least_death.show(truncate=False)

+------------------------------------+---------------+
|Name of State / UT                  |sum_total_death|
+------------------------------------+---------------+
|Union Territory of Jammu and Kashmir|0.0            |
+------------------------------------+---------------+



### The state has the lowest Death to Total Confirmed cases ratio

In [37]:
lowest_death_total_confirmed_cases_ratio = df_csv.withColumn("ratio", F.col("Death")/F.col("Total Confirmed cases"))
lowest_death_total_confirmed_cases_ratio = lowest_death_total_confirmed_cases_ratio.groupBy("Name of State / UT").agg(F.avg("ratio").alias("avg_ratio"))
window_spec = Window.orderBy(F.col("avg_ratio"))
lowest_death_total_confirmed_cases_ratio = lowest_death_total_confirmed_cases_ratio.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')
lowest_death_total_confirmed_cases_ratio.show(truncate=False)

+------------------+---------+
|Name of State / UT|avg_ratio|
+------------------+---------+
|Mizoram           |0.0      |
+------------------+---------+



### Find which month the more Newer recovered cases

In [38]:
new_cases_by_month = df_csv.groupBy(date_format('Date','MMMM').alias("Month")).agg(sum("New recovered").alias("Total New Recovered"))
most_recover_cases = new_cases_by_month.orderBy(new_cases_by_month["Total New Recovered"].desc()).first()["Month"]
print(f"The month with most new recoveries is '{most_recover_cases}'")

The month with most new recoveries is 'July'
