In [1]:
pip install pyspark



In [2]:
import pyspark
print(pyspark.__version__)

3.5.2


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, month
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when

In [4]:
# spark = SparkSession.builder.appName("ANALYSIS OF COVID").getOrCreate()

In [5]:
spark = SparkSession.builder \
    .appName("ANALYSIS OF COVID") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

In [6]:
df = spark.read.csv("/content/complete.csv", header=True)

In [7]:
df.show()

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            Kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            Kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

In [8]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)



In [9]:
df = df.withColumnRenamed("Name of State / UT","State") \
      .withColumnRenamed("Cured/Discharged/Migrated","Status")

In [10]:
df.show(10)

+----------+------+--------+---------+---------------------+-----+------+---------+----------+-------------+
|      Date| State|Latitude|Longitude|Total Confirmed cases|Death|Status|New cases|New deaths|New recovered|
+----------+------+--------+---------+---------------------+-----+------+---------+----------+-------------+
|2020-01-30|Kerala| 10.8505|  76.2711|                  1.0|    0|   0.0|        0|         0|            0|
|2020-01-31|Kerala| 10.8505|  76.2711|                  1.0|    0|   0.0|        0|         0|            0|
|2020-02-01|Kerala| 10.8505|  76.2711|                  2.0|    0|   0.0|        1|         0|            0|
|2020-02-02|Kerala| 10.8505|  76.2711|                  3.0|    0|   0.0|        1|         0|            0|
|2020-02-03|Kerala| 10.8505|  76.2711|                  3.0|    0|   0.0|        0|         0|            0|
|2020-02-04|Kerala| 10.8505|  76.2711|                  3.0|    0|   0.0|        0|         0|            0|
|2020-02-05|Kerala|

In [11]:
df.count()

4692

In [12]:
df = df.dropna()

In [13]:
df.count()

4692

**1) CONVERT ALL STATE NAMES TO LOWERCASE**

In [14]:
df=df.withColumn("State", lower(col("State")))


In [15]:
df.show()

+----------+------+--------+---------+---------------------+-----+------+---------+----------+-------------+
|      Date| State|Latitude|Longitude|Total Confirmed cases|Death|Status|New cases|New deaths|New recovered|
+----------+------+--------+---------+---------------------+-----+------+---------+----------+-------------+
|2020-01-30|kerala| 10.8505|  76.2711|                  1.0|    0|   0.0|        0|         0|            0|
|2020-01-31|kerala| 10.8505|  76.2711|                  1.0|    0|   0.0|        0|         0|            0|
|2020-02-01|kerala| 10.8505|  76.2711|                  2.0|    0|   0.0|        1|         0|            0|
|2020-02-02|kerala| 10.8505|  76.2711|                  3.0|    0|   0.0|        1|         0|            0|
|2020-02-03|kerala| 10.8505|  76.2711|                  3.0|    0|   0.0|        0|         0|            0|
|2020-02-04|kerala| 10.8505|  76.2711|                  3.0|    0|   0.0|        0|         0|            0|
|2020-02-05|kerala|

**2) THE DAY THAT HAS GREATER NUMBER OF COVID CASES**

In [16]:
df_output = df.orderBy(col("New cases").desc()).first()

In [17]:
print(f"{df_output['Date']} with {df_output['New cases']} cases.")

2020-07-21 with 998 cases.


**3) THE STATE HAS SECOND HIGHEST NUMBER OF COVID CASE**

In [18]:
df = df.withColumn("Total Confirmed cases", col("Total Confirmed cases").cast(IntegerType()))

In [19]:
df_output_state_with_high_covid = df.groupBy("State") \
    .sum("Total Confirmed cases") \
    .orderBy(col("sum(Total Confirmed cases)").desc()) \
    .collect()[1]


In [20]:
print(f"{df_output_state_with_high_covid['State']} with {df_output_state_with_high_covid['sum(Total Confirmed cases)']} cases")


tamil nadu with 7847083 cases


**4) UNION TERRITORY WITH LEAST NUM OF DEATH**

In [21]:
df = df.withColumn("Death", col("Death").cast(IntegerType()))

In [22]:
df_output_UT_with_least_num_of_death = df.filter(col("State").like("union territory%")) \
    .groupBy("State") \
    .sum("Death") \
    .orderBy(col("sum(Death)").asc()) \
    .first()

In [23]:
print(f"{df_output_UT_with_least_num_of_death['State']} with {df_output_UT_with_least_num_of_death['sum(Death)']} Deaths")

union territory of ladakh with 0 Deaths


**5) THE STATE HAS THE LOWEST DEATH TO TOTAL CONFIRMED CASES RATIO**

In [24]:
df_output_state_ratio = df.withColumn("Death_to_Confirmed_Ratio",
                              when(col("Total Confirmed cases") > 0, col("Death") / col("Total Confirmed cases")).otherwise(0))


In [25]:
df_output_state_ratio.show()

+----------+------+--------+---------+---------------------+-----+------+---------+----------+-------------+------------------------+
|      Date| State|Latitude|Longitude|Total Confirmed cases|Death|Status|New cases|New deaths|New recovered|Death_to_Confirmed_Ratio|
+----------+------+--------+---------+---------------------+-----+------+---------+----------+-------------+------------------------+
|2020-01-30|kerala| 10.8505|  76.2711|                    1|    0|   0.0|        0|         0|            0|                     0.0|
|2020-01-31|kerala| 10.8505|  76.2711|                    1|    0|   0.0|        0|         0|            0|                     0.0|
|2020-02-01|kerala| 10.8505|  76.2711|                    2|    0|   0.0|        1|         0|            0|                     0.0|
|2020-02-02|kerala| 10.8505|  76.2711|                    3|    0|   0.0|        1|         0|            0|                     0.0|
|2020-02-03|kerala| 10.8505|  76.2711|                    3|  

In [26]:
df_output_loweest = df_output_state_ratio.groupBy("State") \
    .avg("death_to_confirmed_ratio") \
    .orderBy(col("avg(death_to_confirmed_ratio)").asc()) \
    .first()

In [27]:
df_output_loweest

Row(State='union territory of ladakh', avg(death_to_confirmed_ratio)=0.0)

In [28]:
print(f"{df_output_loweest['State']} with ratio {df_output_loweest['avg(death_to_confirmed_ratio)']}")


union territory of ladakh with ratio 0.0


**6) MONTH MORE NEWER CASES RECOVERED**

In [29]:
df = df.withColumn("New recovered", col("New recovered").cast(IntegerType()))

In [30]:
df_output_month = df.withColumn("Month", month(col("Date")))

In [31]:
df_output_recovered = df_output_month.groupBy("Month") \
    .sum("New recovered") \
    .orderBy(col("sum(New recovered)").desc()) \
    .first()

In [32]:
months = {
    1: "January", 2: "February", 3: "March", 4: "April", 5: "May", 6: "June",
    7: "July", 8: "August", 9: "September", 10: "October", 11: "November", 12: "December"
}

In [33]:
print(f"{months.get(df_output_recovered['Month'])}")

July
