In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Covid Data Analysis") \
    .getOrCreate()


In [2]:
df = spark.read.csv('/content/DataSheet.csv', header=True, inferSchema=True)
df.show(5)


+--------+---------+-----------+-------------+-----------+------------+------------------+-----------------+----------+------------+------------+---------------------+-------------------+
|iso_code|continent|   location|date_current*|total_cases|total_deaths|total_vaccinations|people_vaccinated|median_age|age_65_older|age_70_older|cardiovasc_death_rate|diabetes_prevalence|
+--------+---------+-----------+-------------+-----------+------------+------------------+-----------------+----------+------------+------------+---------------------+-------------------+
|     AFG|     Asia|Afghanistan|   10-01-2020|      39285|        1460|              NULL|             NULL|      18.6|       2.581|       1.337|              597.029|               9.59|
|     AFG|     Asia|Afghanistan|   10-02-2020|      39290|        1460|              NULL|             NULL|      18.6|       2.581|       1.337|              597.029|               9.59|
|     AFG|     Asia|Afghanistan|   10-03-2020|      39297|  

1.Create RDDs and DataFrames needed to perform the below operations

In [21]:
rdd = spark.sparkContext.textFile('/content/DataSheet.csv')
header = rdd.first()  # Get header row
data_rdd = rdd.filter(lambda x: x != header).map(lambda x: x.split(','))


In [23]:
# Filter out rows where x[4] is empty or not a valid float
filtered_rdd = data_rdd.filter(lambda x: x[4].strip() != '')

# Map and reduce operations
continent_rdd = filtered_rdd.map(lambda x: (x[1], float(x[4])))  # (continent, total_cases)
continent_total_cases = continent_rdd.reduceByKey(lambda a, b: a + b)

# Collect the results
result = continent_total_cases.collect()
print(result)


[('Asia', 2134990087.0), ('', 8190091253.0), ('Europe', 2155273358.0), ('Africa', 288867928.0), ('North America', 2176243155.0), ('South America', 1430718441.0), ('Oceania', 3819476.0)]


2.Find the Total Number of Cases in Each Continent


In [3]:
df.groupBy('continent').agg({'total_cases': 'sum'}).show()


+-------------+----------------+
|    continent|sum(total_cases)|
+-------------+----------------+
|       Europe|      2155273358|
|       Africa|       288867928|
|         NULL|      8190091253|
|North America|      2176243155|
|South America|      1430718441|
|      Oceania|         3819476|
|         Asia|      2134990087|
+-------------+----------------+



3.Find the Total Number of Deaths in Each Location

In [4]:
df.groupBy('location').agg({'total_deaths': 'sum'}).show()


+-------------+-----------------+
|     location|sum(total_deaths)|
+-------------+-----------------+
|         Chad|            12598|
|     Paraguay|           225768|
|       Russia|          5291911|
|International|             1860|
|        World|        190245307|
|        Yemen|            74938|
|      Senegal|            47413|
|       Sweden|           947159|
|       Guyana|            17812|
|      Eritrea|              191|
|  Philippines|          1026379|
|     Djibouti|             7573|
|     Malaysia|            47518|
|    Singapore|             3527|
|         Fiji|              248|
|       Turkey|          1933353|
|       Malawi|            28696|
|         Iraq|          1468363|
|      Germany|          2947404|
|      Comoros|             2139|
+-------------+-----------------+
only showing top 20 rows



4.Compute the Maximum Deaths in Specific Locations (Europe and Asia)

In [5]:
df.filter(df['continent'] == 'Europe').agg({'total_deaths': 'max'}).show()
df.filter(df['continent'] == 'Asia').agg({'total_deaths': 'max'}).show()


+-----------------+
|max(total_deaths)|
+-----------------+
|           106774|
+-----------------+

+-----------------+
|max(total_deaths)|
+-----------------+
|           154486|
+-----------------+



5.Find the Total Number of People Vaccinated at Each Continent

In [6]:
df.groupBy('continent').agg({'people_vaccinated': 'sum'}).show()


+-------------+----------------------+
|    continent|sum(people_vaccinated)|
+-------------+----------------------+
|       Europe|             267453341|
|       Africa|                  NULL|
|         NULL|             855976224|
|North America|             338702417|
|South America|              17349570|
|      Oceania|                  NULL|
|         Asia|             122587289|
+-------------+----------------------+



6.Find the Count of Country-wise Vaccinations for the Month of January 2021

In [17]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


In [19]:
from pyspark.sql.functions import to_date, col

# Adjust the policy for parsing
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Convert to date with proper handling
df = df.withColumn('date_current*', to_date(col('date_current*'), 'M/d/yyyy'))

# Filter for January 2021 and aggregate
result = df.filter((df['date_current*'] >= '2021-01-01') & (df['date_current*'] <= '2021-01-31')) \
           .groupBy('location') \
           .agg({'people_vaccinated': 'sum'})

result.show()


+-------------+----------------------+
|     location|sum(people_vaccinated)|
+-------------+----------------------+
|         Chad|                  NULL|
|     Paraguay|                  NULL|
|       Russia|                  NULL|
|International|                  NULL|
|        World|             687183058|
|        Yemen|                  NULL|
|      Senegal|                  NULL|
|       Sweden|                725177|
|       Guyana|                  NULL|
|      Eritrea|                  NULL|
|  Philippines|                  NULL|
|     Djibouti|                  NULL|
|     Malaysia|                  NULL|
|    Singapore|                328000|
|         Fiji|                  NULL|
|       Turkey|                  NULL|
|       Malawi|                  NULL|
|         Iraq|                  NULL|
|      Germany|              27938612|
|      Comoros|                  NULL|
+-------------+----------------------+
only showing top 20 rows



7.What is the Average Number of Total Cases Across All Locations?

In [10]:
df.agg({'total_cases': 'avg'}).show()


+-----------------+
| avg(total_cases)|
+-----------------+
|695069.3243656114|
+-----------------+



8.Which Continent Has the Highest Total Number of Vaccinations?

In [11]:
df.groupBy('continent').agg({'people_vaccinated': 'sum'}) \
    .orderBy('sum(people_vaccinated)', ascending=False) \
    .show(1)


+---------+----------------------+
|continent|sum(people_vaccinated)|
+---------+----------------------+
|     NULL|             855976224|
+---------+----------------------+
only showing top 1 row



9.Extract the Year, Month, and Day from the date_current Column

In [13]:
from pyspark.sql.functions import year, month, dayofmonth

df = df.withColumn('year', year(df['date_current*'])) \
       .withColumn('month', month(df['date_current*'])) \
       .withColumn('day', dayofmonth(df['date_current*']))

df.show(100)


+--------+---------+-----------+-------------+-----------+------------+------------------+-----------------+----------+------------+------------+---------------------+-------------------+----+-----+----+
|iso_code|continent|   location|date_current*|total_cases|total_deaths|total_vaccinations|people_vaccinated|median_age|age_65_older|age_70_older|cardiovasc_death_rate|diabetes_prevalence|year|month| day|
+--------+---------+-----------+-------------+-----------+------------+------------------+-----------------+----------+------------+------------+---------------------+-------------------+----+-----+----+
|     AFG|     Asia|Afghanistan|         NULL|      39285|        1460|              NULL|             NULL|      18.6|       2.581|       1.337|              597.029|               9.59|NULL| NULL|NULL|
|     AFG|     Asia|Afghanistan|         NULL|      39290|        1460|              NULL|             NULL|      18.6|       2.581|       1.337|              597.029|               9.