3rd Cell in Notebook - AGG, Dataset Used - COVID 19 DATASET

In [6]:
#Total confirmed, deaths, and recovered cases by continent
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

spark = SparkSession.builder.appName("CovidAnalysis").getOrCreate()

df_country = spark.read.csv("country_wise_latest.csv", header=True, inferSchema=True)

agg_by_region = df_country.groupBy("WHO Region").agg(
    sum("Confirmed").alias("Total_Confirmed"),
    sum("Deaths").alias("Total_Deaths"),
    sum("Recovered").alias("Total_Recovered")
)

agg_by_region.show()




+--------------------+---------------+------------+---------------+
|          WHO Region|Total_Confirmed|Total_Deaths|Total_Recovered|
+--------------------+---------------+------------+---------------+
|              Europe|        3299523|      211144|        1993723|
|     Western Pacific|         292428|        8249|         206770|
|              Africa|         723207|       12223|         440645|
|Eastern Mediterra...|        1490744|       38339|        1201400|
|            Americas|        8839286|      342732|        4468616|
|     South-East Asia|        1835297|       41349|        1156933|
+--------------------+---------------+------------+---------------+



In [4]:
df_country.printSchema()
df_country.show(5)


root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)
 |-- Deaths / 100 Cases: double (nullable = true)
 |-- Recovered / 100 Cases: double (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: integer (nullable = true)
 |-- 1 week change: integer (nullable = true)
 |-- 1 week % increase: double (nullable = true)
 |-- WHO Region: string (nullable = true)

+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|D

In [7]:
#Average fatality rate by WHO Region
from pyspark.sql.functions import avg

fatality_rate_by_region = df_country.groupBy("WHO Region").agg(
    avg("Deaths / 100 Cases").alias("Avg_Fatality_Rate")
)

fatality_rate_by_region.show()


+--------------------+------------------+
|          WHO Region| Avg_Fatality_Rate|
+--------------------+------------------+
|              Europe| 4.198392857142857|
|     Western Pacific|              1.29|
|              Africa| 2.306458333333334|
|Eastern Mediterra...|3.5631818181818176|
|            Americas|3.0525714285714285|
|     South-East Asia|1.2959999999999998|
+--------------------+------------------+



In [8]:
#Daily New Cases per WHO Region (based on "New Cases" column)
new_cases_by_region = df_country.groupBy("WHO Region").agg(
    sum("New cases").alias("Total_New_Cases")
)

new_cases_by_region.show()


+--------------------+---------------+
|          WHO Region|Total_New_Cases|
+--------------------+---------------+
|              Europe|          22294|
|     Western Pacific|           3289|
|              Africa|          12176|
|Eastern Mediterra...|          12410|
|            Americas|         129531|
|     South-East Asia|          48993|
+--------------------+---------------+



In [9]:
#Max, Min, and Average Confirmed Cases per WHO Region
from pyspark.sql.functions import max, min, avg

confirmed_stats_by_region = df_country.groupBy("WHO Region").agg(
    max("Confirmed").alias("Max_Confirmed"),
    min("Confirmed").alias("Min_Confirmed"),
    avg("Confirmed").alias("Avg_Confirmed")
)

confirmed_stats_by_region.show()


+--------------------+-------------+-------------+-----------------+
|          WHO Region|Max_Confirmed|Min_Confirmed|    Avg_Confirmed|
+--------------------+-------------+-------------+-----------------+
|              Europe|       816680|           12|58920.05357142857|
|     Western Pacific|        86783|           20|         18276.75|
|              Africa|       452529|           10|       15066.8125|
|Eastern Mediterra...|       293606|          674|67761.09090909091|
|            Americas|      4290259|           17|252551.0285714286|
|     South-East Asia|      1480073|           24|         183529.7|
+--------------------+-------------+-------------+-----------------+



In [10]:
#Total Active Cases by WHO Region
active_by_region = df_country.groupBy("WHO Region").agg(
    sum("Active").alias("Total_Active_Cases")
)

active_by_region.show()


+--------------------+------------------+
|          WHO Region|Total_Active_Cases|
+--------------------+------------------+
|              Europe|           1094656|
|     Western Pacific|             77409|
|              Africa|            270339|
|Eastern Mediterra...|            251005|
|            Americas|           4027938|
|     South-East Asia|            637015|
+--------------------+------------------+



In [11]:
# Top 5 WHO Regions by Recovery Rate
from pyspark.sql.functions import avg

recovery_rate_by_region = df_country.groupBy("WHO Region").agg(
    avg("Recovered / 100 Cases").alias("Avg_Recovery_Rate")
)

recovery_rate_by_region.orderBy("Avg_Recovery_Rate", ascending=False).show(5)


+--------------------+-----------------+
|          WHO Region|Avg_Recovery_Rate|
+--------------------+-----------------+
|     Western Pacific|           76.805|
|              Europe|68.63499999999998|
|     South-East Asia|66.70400000000001|
|Eastern Mediterra...|66.59318181818182|
|            Americas|62.29142857142857|
+--------------------+-----------------+
only showing top 5 rows



In [12]:
#Sum of New Deaths by WHO Region
new_deaths_by_region = df_country.groupBy("WHO Region").agg(
    sum("New deaths").alias("Total_New_Deaths")
)

new_deaths_by_region.show()


+--------------------+----------------+
|          WHO Region|Total_New_Deaths|
+--------------------+----------------+
|              Europe|             304|
|     Western Pacific|              24|
|              Africa|             353|
|Eastern Mediterra...|             445|
|            Americas|            3555|
|     South-East Asia|             734|
+--------------------+----------------+



In [13]:
#Countries with Highest Deaths / 100 Recovered Ratio
from pyspark.sql.functions import col

df_clean = df_country.withColumn("DeathsPer100Recovered", col("Deaths / 100 Recovered").cast("double"))

highest_deaths_per_recovered = df_clean.select("Country/Region", "DeathsPer100Recovered") \
    .orderBy("DeathsPer100Recovered", ascending=False)

highest_deaths_per_recovered.show(10)


+--------------+---------------------+
|Country/Region|DeathsPer100Recovered|
+--------------+---------------------+
|        Canada|             Infinity|
|    Mozambique|             Infinity|
|        Serbia|             Infinity|
|        Sweden|             Infinity|
|         Syria|             Infinity|
|   Netherlands|              3259.26|
|United Kingdom|              3190.26|
|         Yemen|                57.98|
|       Belgium|                56.28|
|        France|                 37.2|
+--------------+---------------------+
only showing top 10 rows



In [14]:
#Total Weekly Increase by WHO Region
weekly_increase = df_country.groupBy("WHO Region").agg(
    sum("1 week change").alias("Weekly_Case_Change")
)

weekly_increase.orderBy("Weekly_Case_Change", ascending=False).show()


+--------------------+------------------+
|          WHO Region|Weekly_Case_Change|
+--------------------+------------------+
|            Americas|           1024088|
|     South-East Asia|            357014|
|              Europe|            152707|
|              Africa|            115087|
|Eastern Mediterra...|             91897|
|     Western Pacific|             26069|
+--------------------+------------------+



In [15]:
#Average Weekly Percentage Increase by WHO Region
avg_weekly_growth = df_country.groupBy("WHO Region").agg(
    avg("1 week % increase").alias("Avg_Weekly_Percent_Increase")
)

avg_weekly_growth.orderBy("Avg_Weekly_Percent_Increase", ascending=False).show()


+--------------------+---------------------------+
|          WHO Region|Avg_Weekly_Percent_Increase|
+--------------------+---------------------------+
|     Western Pacific|                   22.11125|
|              Africa|         18.086458333333333|
|            Americas|         16.331142857142854|
|Eastern Mediterra...|          10.48227272727273|
|     South-East Asia|          8.513000000000002|
|              Europe|          7.769642857142861|
+--------------------+---------------------------+

