In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types 
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.appName("covid").getOrCreate()
filepath = "/Users/VC/Downloads/complete.csv"

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
df_csv = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace",True) \
            .option("ignoreTrailingWhiteSpace",True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load(filepath)

In [None]:
df_csv.printSchema()

In [None]:
df_csv = df_csv.withColumn("total_case", df_csv["Total Confirmed cases"].cast(types.LongType()))
df_csv = df_csv.withColumn("total_newly_recovered", df_csv["New recovered"].cast(types.LongType()))
df_csv = df_csv.withColumn("state", df_csv["Name of State / UT"].cast(types.StringType()))
df_csv = df_csv.withColumn("death_Case", df_csv["Death"].cast(types.LongType()))
df_csv.printSchema()

# Day with most covid cases.

In [None]:
output_df_1 = df_csv.groupBy("Date").agg(F.sum("total_case").alias("sum_total_case"))

In [None]:
window_spec = Window.orderBy(F.col("sum_total_case").desc())

In [None]:
output_df_1 = output_df_1.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [None]:
output_df_1.show()

# State with second most covid cases.

In [None]:
output_df_2 = df_csv.groupBy("state").agg(F.sum("total_case").alias("sum_total_case"))

In [None]:
window_spec = Window.orderBy(F.col("sum_total_case").desc())

In [None]:
output_df_2 = output_df_2.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 2).drop('recency')

In [None]:
output_df_2.show()

# Union Territory with least number of death.

In [None]:
output_df_3 = df_csv.where(F.col('state').like("Union Territory%"))

In [None]:
output_df_3 = output_df_3.groupBy("state").agg(F.sum("death_Case").alias("sum_total_death"))

In [None]:
window_spec = Window.orderBy(F.col("sum_total_death"))

In [None]:
output_df_3 = output_df_3.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [None]:
output_df_3.show(truncate=False)

# State with the Lowest Death to Total Confirmed cases ratio.

In [None]:
output_df_4 = df_csv.withColumn("ratio", F.col("death_Case")/F.col("total_case"))

In [None]:
output_df_4 = output_df_4.groupBy("state").agg(F.avg("ratio").alias("avg_ratio"))

In [None]:
window_spec = Window.orderBy(F.col("avg_ratio"))

In [None]:
output_df_4 = output_df_4.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [None]:
output_df_4.show(truncate=False)

# Names to lowercase.

In [None]:
output_df_5 = df_csv.withColumn('state_lower', F.lower(F.col("state")))

In [None]:
output_df_5.select("state_lower").distinct().show()

In [None]:
import calendar
from datetime import datetime

def get_month_name(month_number):
    return calendar.month_name[int(month_number)]

def get_month(date):
    print(date)
    date = datetime.strptime(date, "%Y-%m-%d")
    return date.month

get_month_udf = F.udf(lambda a : get_month(a), types.StringType())
get_month_name_udf = F.udf(lambda a : get_month_name(a), types.StringType())

In [None]:
output_df_6 = df_csv.withColumn("month", get_month_udf(F.col("date")))

In [None]:
output_df_6 = output_df_6.groupBy("month").agg(F.sum("total_newly_recovered").alias("sum_newly_recovered"))

In [None]:
window_spec = Window.orderBy(F.col("sum_newly_recovered").desc())

In [None]:
output_df_6 = output_df_6.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [None]:
output_df_6 = output_df_6.withColumn("month", get_month_name_udf(F.col("month")))

In [None]:
output_df_6.show()