In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=845d93d617797c6a446a1bf7219c58d0a8cdd560a82232869223b583bc369dd5
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [4]:
from pyspark.sql import SparkSession

In [8]:
spark = SparkSession.builder.master("local[*]").appName("AnalyseCovidData").getOrCreate()
file_path = "/content/complete.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)


In [9]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Total Confirmed cases: double (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: double (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)



In [10]:
df.show()

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            Kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            Kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

### **QUESTION 1 : Convert all state names to lowercase.**

In [29]:
from pyspark.sql.functions import col, lower
df = df.withColumn("Name of State / UT", lower(col("Name of State / UT")))
df.show()

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

### **QUESTION 2 Find the day with the greatest number of COVID cases.**

In [36]:
from pyspark.sql.functions import desc
max_cases_day = df.orderBy(col("Total Confirmed cases").desc()).select("Date").first()[0]
print(max_cases_day)

2020-08-06


### **Find the state with the second-largest number of COVID cases.**

In [53]:
df.createOrReplaceTempView("state_cases")

# Use SQL to perform the aggregation
state_cases_df = spark.sql("""
    SELECT `Name of State / UT`, SUM(`Total Confirmed cases`) AS TotalCases
    FROM state_cases
    GROUP BY `Name of State / UT`
    ORDER BY TotalCases DESC
""")

Seond_largest = state_cases_df.collect()
print(Seond_largest)

+------------------+-----------+
|Name of State / UT| TotalCases|
+------------------+-----------+
|       maharashtra|1.5192247E7|
|        tamil nadu|  7847083.0|
|             delhi|  5766124.0|
|    andhra pradesh|  2742054.0|
|         karnataka|  2733901.0|
|           gujarat|  2730710.0|
|     uttar pradesh|  2462456.0|
|         telangana|  1644466.0|
|         rajasthan|  1622247.0|
|       west bengal|  1602230.0|
|    madhya pradesh|  1291485.0|
|             bihar|  1277395.0|
|           haryana|  1161598.0|
|             assam|  1003558.0|
|            odisha|   831767.0|
| jammu and kashmir|   685423.0|
|            kerala|   596758.0|
|            punjab|   539968.0|
|         jharkhand|   282717.0|
|      chhattisgarh|   256589.0|
+------------------+-----------+
only showing top 20 rows

[Row(Name of State / UT='uttarakhand', TotalCases=231641.0), Row(Name of State / UT='goa', TotalCases=150799.0), Row(Name of State / UT='tripura', TotalCases=141618.0), Row(Name of S

### **Identify which Union Territory has the least number of deaths.**

In [56]:
least_deaths_ut_df = spark.sql("""
    SELECT `Name of State / UT`, SUM(Death) AS TotalDeaths
    FROM state_cases
    WHERE `Name of State / UT` LIKE 'union territory%'
    GROUP BY `Name of State / UT`
    ORDER BY TotalDeaths ASC
""")

least_deaths_ut = least_deaths_ut_df.first()
print(least_deaths_ut)

Row(Name of State / UT='union territory of ladakh', TotalDeaths=0.0)


### **Find the state with the lowest Death to Total Confirmed cases ratio.**

In [62]:
lowest_ratio_df = spark.sql("""
    SELECT `Name of State / UT`,
           SUM(Death) AS TotalDeaths,
           SUM(`Total Confirmed cases`) AS TotalConfirmed,
           SUM(Death) / SUM(`Total Confirmed cases`) AS DeathToConfirmedRatio
    FROM state_cases
    GROUP BY `Name of State / UT`
    ORDER BY DeathToConfirmedRatio ASC
""")

# Get the state with the lowest Death-to-Total Confirmed cases ratio
lowest_ratio_df.show()
lowest_ratio_state = lowest_ratio_df.first()
print(lowest_ratio_state)

+--------------------+-----------+--------------+---------------------+
|  Name of State / UT|TotalDeaths|TotalConfirmed|DeathToConfirmedRatio|
+--------------------+-----------+--------------+---------------------+
|union territory o...|        0.0|          58.0|                  0.0|
|union territory o...|        0.0|          26.0|                  0.0|
|union territory o...|        0.0|           2.0|                  0.0|
|             mizoram|        0.0|       13335.0|                  0.0|
|             manipur|       43.0|       84000.0| 5.119047619047619E-4|
|              sikkim|       10.0|       13897.0| 7.195797654169965E-4|
|            nagaland|       60.0|       45006.0| 0.001333155579256...|
|dadra and nagar h...|       46.0|       26209.0| 0.001755122286237552|
|              ladakh|      127.0|       57213.0| 0.002219775225910...|
|               assam|     2318.0|     1003558.0| 0.002309781796368521|
|             tripura|      364.0|      141618.0| 0.002570294736

### **Find which month had the most "newer" recovered cases**

In [74]:
from pyspark.sql.functions import  to_date
most_recovered_month_df = spark.sql("""
    SELECT
        year(Date) AS Year,
        month(Date) AS Month,
        SUM(`New recovered`) AS TotalRecovered
    FROM state_cases
    GROUP BY Year, Month
    ORDER BY TotalRecovered DESC

""")
most_recovered_month = most_recovered_month_df.collect()[0]

month_names = {
    1: "January", 2: "February", 3: "March", 4: "April",
    5: "May", 6: "June", 7: "July", 8: "August",
    9: "September", 10: "October", 11: "November", 12: "December"
}
month_name = month_names.get(most_recovered_month['Month'])
print(month_name)

July
