In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=8f52dfdbb742df95b6c3ec2e5baac3c07a505ad6ece060daa0a9674736cc4d0c
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types
from pyspark.sql.window import Window

In [24]:
spark = SparkSession.builder.appName("covid").getOrCreate()
filepath = "/complete.csv"

In [25]:
df_csv = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace",True) \
            .option("ignoreTrailingWhiteSpace",True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load(filepath)

In [26]:
df_csv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)



In [27]:
df_csv = df_csv.withColumn("total_case", df_csv["Total Confirmed cases"].cast(types.LongType()))
df_csv = df_csv.withColumn("total_newly_recovered", df_csv["New recovered"].cast(types.LongType()))
df_csv = df_csv.withColumn("state", df_csv["Name of State / UT"].cast(types.StringType()))
df_csv = df_csv.withColumn("death_Case", df_csv["Death"].cast(types.LongType()))
df_csv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- total_case: long (nullable = true)
 |-- total_newly_recovered: long (nullable = true)
 |-- state: string (nullable = true)
 |-- death_Case: long (nullable = true)



In [28]:
df_csv = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace",True) \
            .option("ignoreTrailingWhiteSpace",True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load(filepath)

In [29]:
df_csv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)



In [33]:
df_csv = df_csv.withColumn("total_case", df_csv["Total Confirmed cases"].cast(types.LongType()))
df_csv = df_csv.withColumn("total_newly_recovered", df_csv["New recovered"].cast(types.LongType()))
df_csv = df_csv.withColumn("state", df_csv["Name of State / UT"].cast(types.StringType()))
df_csv = df_csv.withColumn("death_Case", df_csv["Death"].cast(types.LongType()))
df_csv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- total_case: long (nullable = true)
 |-- total_newly_recovered: long (nullable = true)
 |-- state: string (nullable = true)
 |-- death_Case: long (nullable = true)



Convert All state names to lowercase

In [35]:
output_df_5 = df_csv.withColumn('lower', F.lower(F.col("state")))

In [37]:
output_df_5.select("lower").distinct().show()

+--------------------+
|               lower|
+--------------------+
|               delhi|
|         maharashtra|
|           meghalaya|
|              odisha|
|             haryana|
|         west bengal|
|                 goa|
|              punjab|
|   jammu and kashmir|
|dadra and nagar h...|
|           karnataka|
|      andhra pradesh|
|           telangana|
|            nagaland|
|               bihar|
|      madhya pradesh|
|           jharkhand|
|               assam|
|              kerala|
|          tamil nadu|
+--------------------+
only showing top 20 rows



The day had a greater number of covid cases

In [39]:
output_df_1 = df_csv.groupBy("Date").agg(F.sum("total_case").alias("sumtotal"))

In [40]:
window_spec = Window.orderBy(F.col("sumtotal").desc())

In [41]:
output_df_1 = output_df_1.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [42]:
output_df_1.show()

+----------+--------+
|      Date|sumtotal|
+----------+--------+
|2020-08-06| 1964536|
+----------+--------+



The state has the second-largest number of covid cases

In [43]:
output_df_2 = df_csv.groupBy("state").agg(F.sum("total_case").alias("sumtotal"))

In [44]:
window_spec = Window.orderBy(F.col("sumtotal").desc())

In [45]:
output_df_2 = output_df_2.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 2).drop('recency')

In [46]:
output_df_2.show()

+----------+--------+
|     state|sumtotal|
+----------+--------+
|Tamil Nadu| 7847083|
+----------+--------+



Which Union Territory has the least number of death

In [47]:
output_df_3 = df_csv.where(F.col('state').like("Union_Territory%"))

In [48]:
output_df_3 = output_df_3.groupBy("state").agg(F.sum("death_Case").alias("sumtotal"))

In [49]:
window_spec = Window.orderBy(F.col("sumtotal"))

In [50]:
output_df_3 = output_df_3.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [52]:
output_df_3.show(truncate=False)

+------------------------------------+--------+
|state                               |sumtotal|
+------------------------------------+--------+
|Union Territory of Jammu and Kashmir|0       |
+------------------------------------+--------+



The state has the lowest Death to total confirmed cases ratio

In [54]:
output_df_4 = df_csv.withColumn("ratio", F.col("death_Case")/F.col("total_case"))

In [55]:
output_df_4 = output_df_4.groupBy("state").agg(F.avg("ratio").alias("ratio"))

In [56]:
window_spec = Window.orderBy(F.col("ratio"))

In [57]:
output_df_4 = output_df_4.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [58]:
output_df_4.show(truncate=False)

+-------+-----+
|state  |ratio|
+-------+-----+
|Mizoram|0.0  |
+-------+-----+



Find which month the newer recovered cases

In [59]:
import calendar
from datetime import datetime

def get_month_name(month_number):
    return calendar.month_name[int(month_number)]

def get_month(date):
    print(date)
    date = datetime.strptime(date, "%Y-%m-%d")
    return date.month

get_month_udf = F.udf(lambda a : get_month(a), types.StringType())
get_month_name_udf = F.udf(lambda a : get_month_name(a), types.StringType())

In [60]:
output_df_6 = df_csv.withColumn("month", get_month_udf(F.col("date")))

In [61]:
output_df_6 = output_df_6.groupBy("month").agg(F.sum("total_newly_recovered").alias("sumtotal"))

In [62]:
window_spec = Window.orderBy(F.col("sumtotal").desc())

In [63]:
output_df_6 = output_df_6.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [64]:
output_df_6 = output_df_6.withColumn("month", get_month_name_udf(F.col("month")))

In [65]:
output_df_6.show()

+-----+--------+
|month|sumtotal|
+-----+--------+
| July|  722983|
+-----+--------+

