## Name - Utpal Tripathi                     Mail - utripathi@presidio.com

In [5]:
import findspark  # Import findspark to locate the Spark installation
findspark.init()  # Initialize findspark to set up the environment for Spark

from pyspark.sql import SparkSession  # Import SparkSession from pyspark.sql

# Initialize a Spark session

spark = SparkSession.builder \
    .appName("Covid Data Analysis") \
    .getOrCreate()  



In [10]:
# Load the CSV file into a DataFrame
df = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace",True) \
            .option("ignoreTrailingWhiteSpace",True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load("complete.csv")

In [11]:
from pyspark.sql import types

# Cast columns to appropriate data types
df = df.withColumn("total_case", df["Total Confirmed cases"].cast(types.LongType()))  # Cast 'Total Confirmed cases' to LongType for integer values
df = df.withColumn("total_newly_recovered", df["New recovered"].cast(types.LongType()))  # Cast 'New recovered' to LongType for integer values
df = df.withColumn("new_cases", df["New cases"].cast(types.LongType()))  # Cast 'New cases' to LongType for integer values
df = df.withColumn("state", df["Name of State / UT"].cast(types.StringType()))  # Cast 'Name of State / UT' to StringType for textual data
df = df.withColumn("death_Case", df["Death"].cast(types.LongType()))  # Cast 'Death' to LongType for integer values

# Print the schema of the DataFrame to verify the changes
df.printSchema()


root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- total_case: long (nullable = true)
 |-- total_newly_recovered: long (nullable = true)
 |-- new_cases: long (nullable = true)
 |-- state: string (nullable = true)
 |-- death_Case: long (nullable = true)



## 1. Convert All State Names to Lowercase

In [12]:
from pyspark.sql.functions import lower, col

# Convert the 'state' column to lowercase and create a new column 'state_lower'
output_df_1 = df.withColumn('state_lower', lower(col("state")))

# Select distinct values from the 'state_lower' column and display them
output_df_1.select("state_lower").distinct().show()


+--------------------+
|         state_lower|
+--------------------+
|               delhi|
|         maharashtra|
|           meghalaya|
|              odisha|
|             haryana|
|         west bengal|
|                 goa|
|              punjab|
|   jammu and kashmir|
|dadra and nagar h...|
|           karnataka|
|      andhra pradesh|
|           telangana|
|            nagaland|
|               bihar|
|      madhya pradesh|
|           jharkhand|
|               assam|
|              kerala|
|          tamil nadu|
+--------------------+
only showing top 20 rows



## 2. Find the Day with the Greatest Number of COVID Cases

In [13]:
# Group by the 'Date' column and sum the 'total_case' for each date
output_df_2 = df.groupBy("Date").sum("total_case")

# Order the results by the summed 'total_case' in descending order
output_df_2 = output_df_2.orderBy("sum(total_case)", ascending=False)

# Show the top row (the day with the greatest number of covid cases)
output_df_2.show(1)


+----------+---------------+
|      Date|sum(total_case)|
+----------+---------------+
|2020-08-06|        1964536|
+----------+---------------+
only showing top 1 row



## 3. Find the State with the Second-Largest Number of COVID Cases

In [14]:
# Group by 'State' and sum the 'total_case' for each state
df_grouped_by_state = df.groupBy("State").sum("total_case")

# Order the results by the summed 'total_case' in descending order
df_grouped_by_state = df_grouped_by_state.orderBy("sum(total_case)", ascending=False)

# Collect the results into a list and get the second row
# Note: Collecting results can be expensive if the DataFrame is large.
second_largest_state = df_grouped_by_state.collect()[1]  # Index 1 for the second row

# Print the result
print(second_largest_state)



Row(State='Tamil Nadu', sum(total_case)=7847083)


## 4. Find the Union Territory with the Least Number of Deaths

In [15]:

# Filter rows where the 'state' column contains "Union Territory"
df_territories = df.filter(col("state").contains("Union Territory"))

# Group by 'state', sum the 'death_Case', and order the results to find the Union Territory with the least number of deaths
df_least_deaths = df_territories.groupBy("state").sum("death_Case") \
                                .orderBy("sum(death_Case)")  # Order by total deaths in ascending order

# Show the result
df_least_deaths.show(1)  # Display the top row with the least number of deaths



+--------------------+---------------+
|               state|sum(death_Case)|
+--------------------+---------------+
|Union Territory o...|              0|
+--------------------+---------------+
only showing top 1 row



## 5. Find the State with the Lowest Death to Total Confirmed Cases Ratio

In [20]:
df.show()
# Calculate the death-to-case ratio for each row
df_ratio = df.withColumn("death_ratio", col("death_Case") / col("total_case"))

# Find the state with the lowest death-to-case ratio by ordering the DataFrame by 'death_ratio' in ascending order
df_lowest_ratio = df_ratio.orderBy("death_ratio").select("state", "death_ratio")

# Display the top row which has the lowest ratio
df_lowest_ratio.show(1)

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+----------+---------------------+---------+------+----------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|total_case|total_newly_recovered|new_cases| state|death_Case|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+----------+---------------------+---------+------+----------+
|2020-01-30|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|         1|                    0|        0|Kerala|         0|
|2020-01-31|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|         1|                    0|        0|Kerala|       

## 6. Find the Month with the Most Newer Recovered Cases

In [21]:
from pyspark.sql.functions import month

# Extract month from the "Date" column
df_with_month = df.withColumn("month", month("Date"))

# Group by month and sum the newly recovered cases
df_grouped_by_month = df_with_month.groupBy("month").sum("total_newly_recovered")

# Order by the sum of newly recovered cases in descending order and select the top month
df_grouped_by_month = df_grouped_by_month.orderBy(col("sum(total_newly_recovered)").desc())
top_month = df_grouped_by_month.first()  # Get the top row

# Convert month number to month name
month_dict = {
    1: "January", 2: "February", 3: "March", 4: "April", 5: "May", 6: "June",
    7: "July", 8: "August", 9: "September", 10: "October", 11: "November", 12: "December"
}

if top_month:
    top_month_number = top_month["month"]
    top_month_name = month_dict.get(top_month_number, "Unknown")
    top_month_cases = top_month["sum(total_newly_recovered)"]
    print(f"Month with the highest number of newly recovered cases: {top_month_name} ({top_month_cases} cases)")
else:
    print("No data available.")


Month with the highest number of newly recovered cases: July (722983 cases)


## ThankYou...