'''<br>
@Author: Rahul<br> 
@Date: 2024-10-08<br>
@Last Modified by: Rahul <br>
@Last Modified time: 2024-10-08<br>
@Title: Python program on covid dataset operation by pyspark.<br>
'''

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [8]:

spark = SparkSession.builder \
        .appName("COVID Death Percentage Analysis") \
        .getOrCreate()

In [44]:
spark

1. To find out the death percentage locally and globally

Global

In [83]:
Global_Death_Percentage_Dataset = spark.read.option("header", "true") \
               .option("delimiter", ",") \
               .csv(r"file:///C:\Users\rahul\Desktop\BridgeLabz\Python\Questions\covid-kaggle-dataset\worldometer_data.csv")

In [87]:
Global_Death_Percentage_Dataset.printSchema()
Global_Death_Percentage_Dataset.createOrReplaceTempView("covid_data")

root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: string (nullable = true)
 |-- TotalCases: string (nullable = true)
 |-- NewCases: string (nullable = true)
 |-- TotalDeaths: string (nullable = true)
 |-- NewDeaths: string (nullable = true)
 |-- TotalRecovered: string (nullable = true)
 |-- NewRecovered: string (nullable = true)
 |-- ActiveCases: string (nullable = true)
 |-- Serious,Critical: string (nullable = true)
 |-- Tot Cases/1M pop: string (nullable = true)
 |-- Deaths/1M pop: string (nullable = true)
 |-- TotalTests: string (nullable = true)
 |-- Tests/1M pop: string (nullable = true)
 |-- WHO Region: string (nullable = true)



In [29]:
global_death_percentage_query = """
SELECT 
    SUM(TotalDeaths) AS TotalDeaths,
    SUM(TotalCases) AS TotalConfirmed,
    Round((SUM(TotalDeaths) / SUM(TotalCases)) * 100,3) AS GlobalDeathPercentage
FROM covid_data
"""

spark.sql(global_death_percentage_query).show()

+-----------+--------------+---------------------+
|TotalDeaths|TotalConfirmed|GlobalDeathPercentage|
+-----------+--------------+---------------------+
|   713007.0|   1.9169166E7|                 3.72|
+-----------+--------------+---------------------+



Local

In [88]:
Global_Death_Percentage_Dataset = spark.read.option("header","true")\
     .option("delimiter",",")\
     .csv(r"file:///C:\Users\rahul\Desktop\BridgeLabz\Python\Questions\Covid Problem\Covid Problem\covid_19_india.csv")

In [90]:
Global_Death_Percentage_Dataset.createOrReplaceTempView("Local_Deaths")
Global_Death_Percentage_Dataset.printSchema()

root
 |-- Sno: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- State/UnionTerritory: string (nullable = true)
 |-- ConfirmedIndianNational: string (nullable = true)
 |-- ConfirmedForeignNational: string (nullable = true)
 |-- Cured: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Confirmed: string (nullable = true)



In [43]:
local_deaths_percentage="""
SELECT 
    SUM(Deaths) AS TotalDeaths,
    SUM(Confirmed) AS TotalConfirmed,
    ROUND((SUM(Deaths)/SUM(Confirmed))*100, 2) AS Local_Percentage
FROM
    Local_Deaths   
"""

spark.sql(local_deaths_percentage).show()

+-----------+--------------+----------------+
|TotalDeaths|TotalConfirmed|Local_Percentage|
+-----------+--------------+----------------+
|7.3389005E7| 5.451678687E9|            1.35|
+-----------+--------------+----------------+



2. To find out the infected population percentage locally and globally

Local Infection Rate

In [95]:
Local_Infection_Percentage_Dataset=spark.read.option("header","true")\
        .option("delimiter",",")\
        .csv(r"file:///C:\Users\rahul\Desktop\BridgeLabz\Python\Questions\Covid Problem\Covid Problem\StatewiseTestingDetails.csv")

In [96]:
Local_Infection_Percentage_Dataset.createOrReplaceTempView("Local_Infection")
Local_Infection_Percentage_Dataset.printSchema()

root
 |-- Date: string (nullable = true)
 |-- State: string (nullable = true)
 |-- TotalSamples: string (nullable = true)
 |-- Negative: string (nullable = true)
 |-- Positive: string (nullable = true)



In [57]:

local_infection_rate = """
SELECT
    ROUND(SUM(TotalSamples), 3) AS TotalSamples,
    ROUND(SUM(Positive), 3) AS Positive,
    ROUND((SUM(Positive) / SUM(TotalSamples)) * 100, 2) AS Infection_Rate
FROM
    Local_Infection
"""
spark.sql(local_infection_rate).show()


+---------------+------------+--------------+
|   TotalSamples|    Positive|Infection_Rate|
+---------------+------------+--------------+
|8.7829949447E10|3.20053246E8|          0.36|
+---------------+------------+--------------+



Global Infection Rate

In [97]:
Global_Infection_Percentage_Dataset=spark.read.option("header","true")\
        .option("delimiter",",")\
        .csv(r"file:///C:\Users\rahul\Desktop\BridgeLabz\Python\Questions\covid-kaggle-dataset\worldometer_data.csv")


In [98]:
Global_Infection_Percentage_Dataset.createOrReplaceTempView("Global_Infection")
Global_Infection_Percentage_Dataset.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: string (nullable = true)
 |-- TotalCases: string (nullable = true)
 |-- NewCases: string (nullable = true)
 |-- TotalDeaths: string (nullable = true)
 |-- NewDeaths: string (nullable = true)
 |-- TotalRecovered: string (nullable = true)
 |-- NewRecovered: string (nullable = true)
 |-- ActiveCases: string (nullable = true)
 |-- Serious,Critical: string (nullable = true)
 |-- Tot Cases/1M pop: string (nullable = true)
 |-- Deaths/1M pop: string (nullable = true)
 |-- TotalTests: string (nullable = true)
 |-- Tests/1M pop: string (nullable = true)
 |-- WHO Region: string (nullable = true)



In [68]:

global_infection_rate = """
SELECT
    ROUND(SUM(TotalCases), 3) AS TotalCases,
    ROUND(SUM(Population), 3) AS Population,
    ROUND((SUM(TotalCases) / SUM(Population)) * 100,2) AS Infection_Rate
FROM
    Global_Infection
"""

spark.sql(global_infection_rate).show()

+-----------+------------+--------------+
| TotalCases|  Population|Infection_Rate|
+-----------+------------+--------------+
|1.9169166E7|6.32642129E9|           0.3|
+-----------+------------+--------------+



3.To find out the countries with the highest infection rates

In [99]:
Highest_Infection_Dataset = spark.read.option("header","true")\
                                 .option("delimiter",",")\
                                 .csv(r"file:///C:\Users\rahul\Desktop\BridgeLabz\Python\Questions\covid-kaggle-dataset\worldometer_data.csv")


In [100]:
Highest_Infection_Dataset.createOrReplaceTempView("Highest_Infection")        
Highest_Infection_Dataset.printSchema()                                 

root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: string (nullable = true)
 |-- TotalCases: string (nullable = true)
 |-- NewCases: string (nullable = true)
 |-- TotalDeaths: string (nullable = true)
 |-- NewDeaths: string (nullable = true)
 |-- TotalRecovered: string (nullable = true)
 |-- NewRecovered: string (nullable = true)
 |-- ActiveCases: string (nullable = true)
 |-- Serious,Critical: string (nullable = true)
 |-- Tot Cases/1M pop: string (nullable = true)
 |-- Deaths/1M pop: string (nullable = true)
 |-- TotalTests: string (nullable = true)
 |-- Tests/1M pop: string (nullable = true)
 |-- WHO Region: string (nullable = true)



In [109]:

highest_infected_countries = """
SELECT 
    `Country/Region`, 
    sum(TotalCases), 
    sum(Population),  
    ROUND( (sum(TotalCases) / sum(Population)  * 100), 2) AS infection_rate
FROM 
    Highest_Infection
GROUP BY
    `Country/Region`    
ORDER BY 
    infection_rate DESC
limit 10;    
"""

spark.sql(highest_infected_countries).show()

+--------------+---------------+---------------+--------------+
|Country/Region|sum(TotalCases)|sum(Population)|infection_rate|
+--------------+---------------+---------------+--------------+
|         Qatar|       112092.0|      2807805.0|          3.99|
| French Guiana|         8127.0|       299385.0|          2.71|
|       Bahrain|        42889.0|      1706669.0|          2.51|
|    San Marino|          699.0|        33938.0|          2.06|
|         Chile|       366671.0|    1.9132514E7|          1.92|
|        Panama|        71418.0|      4321282.0|          1.65|
|        Kuwait|        70045.0|      4276658.0|          1.64|
|          Oman|        80713.0|      5118446.0|          1.58|
|           USA|      5032179.0|    3.3119813E8|          1.52|
|  Vatican City|           12.0|          801.0|           1.5|
+--------------+---------------+---------------+--------------+



4. To find out the countries and continents with the highest death counts

In [124]:
World_over_dataset = spark.read.option("header","true")\
                        .option("delimiter",",")\
                        .csv(r"file:///C:\Users\rahul\Desktop\BridgeLabz\Python\Questions\covid-kaggle-dataset\worldometer_data.csv")


In [125]:
World_over_dataset.createOrReplaceTempView("World_Wise_Death_Count")
World_over_dataset.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: string (nullable = true)
 |-- TotalCases: string (nullable = true)
 |-- NewCases: string (nullable = true)
 |-- TotalDeaths: string (nullable = true)
 |-- NewDeaths: string (nullable = true)
 |-- TotalRecovered: string (nullable = true)
 |-- NewRecovered: string (nullable = true)
 |-- ActiveCases: string (nullable = true)
 |-- Serious,Critical: string (nullable = true)
 |-- Tot Cases/1M pop: string (nullable = true)
 |-- Deaths/1M pop: string (nullable = true)
 |-- TotalTests: string (nullable = true)
 |-- Tests/1M pop: string (nullable = true)
 |-- WHO Region: string (nullable = true)



Continent Wise Death Counts

In [137]:
contient_wise_death_count = """
SELECT
    Continent,
    ROUND(SUM(TotalDeaths)) AS TotalDeaths
FROM
    World_Wise_Death_Count
GROUP BY
    Continent
ORDER BY
    TotalDeaths DESC        
"""

spark.sql(contient_wise_death_count).show()

+-----------------+-----------+
|        Continent|TotalDeaths|
+-----------------+-----------+
|    North America|   229855.0|
|           Europe|   205232.0|
|    South America|   154885.0|
|             Asia|   100627.0|
|           Africa|    22114.0|
|Australia/Oceania|      281.0|
|             NULL|       13.0|
+-----------------+-----------+



Country Wise Death Count

In [142]:
country_wise_death_count = """
SELECT 
    `Country/Region`,
    ROUND(SUM(TotalDeaths)) AS TotalDeaths
FROM 
    World_Wise_Death_Count
GROUP BY 
    `Country/Region`
ORDER BY 
    TotalDeaths DESC  
"""

spark.sql(country_wise_death_count).show()

+--------------+-----------+
|Country/Region|TotalDeaths|
+--------------+-----------+
|           USA|   162804.0|
|        Brazil|    98644.0|
|        Mexico|    50517.0|
|            UK|    46413.0|
|         India|    41638.0|
|         Italy|    35187.0|
|        France|    30312.0|
|         Spain|    28500.0|
|          Peru|    20424.0|
|          Iran|    17976.0|
|        Russia|    14606.0|
|      Colombia|    11939.0|
|         Chile|     9889.0|
|       Belgium|     9859.0|
|  South Africa|     9604.0|
|       Germany|     9252.0|
|        Canada|     8966.0|
|   Netherlands|     6153.0|
|      Pakistan|     6035.0|
|       Ecuador|     5877.0|
+--------------+-----------+
only showing top 20 rows



5. Average number of deaths by day (Continents and Countries)

Country Wise Deaths

In [147]:
country_wise_death_by_day = spark.read.option("header","true")\
     .option("delimiter",",")\
     .csv(r"file:///C:\Users\rahul\Desktop\BridgeLabz\Python\Questions\covid-kaggle-dataset\covid_19_clean_complete.csv")

In [148]:
country_wise_death_by_day.createOrReplaceTempView("Country_Wise_Death_By_Day")
country_wise_death_by_day.printSchema()

root
 |-- Province/State: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Active: string (nullable = true)
 |-- WHO Region: string (nullable = true)



In [151]:
country_wise_death_by_day = """
SELECT 
    `Country/Region`, 
    ROUND(SUM(CAST(Deaths AS BIGINT)) / COUNT(DISTINCT Date),2) AS avg_deaths_per_day
FROM 
    Country_Wise_Death_By_Day
GROUP BY 
    `Country/Region`
ORDER BY 
    avg_deaths_per_day DESC; 
"""

spark.sql(country_wise_death_by_day).show()

+--------------+------------------+
|Country/Region|avg_deaths_per_day|
+--------------+------------------+
|            US|          58571.34|
|United Kingdom|          21264.76|
|        Brazil|          20946.99|
|         Italy|           19721.9|
|        France|          16215.55|
|         Spain|          16133.14|
|        Mexico|           9192.96|
|         India|           5913.99|
|          Iran|           5447.53|
|       Belgium|           5125.95|
|       Germany|           4634.69|
|        Canada|            3721.1|
|         China|           3576.66|
|          Peru|           3468.69|
|   Netherlands|           3310.18|
|        Russia|            3294.6|
|        Turkey|           2479.02|
|        Sweden|           2387.84|
|       Ecuador|           1843.71|
|         Chile|           1715.32|
+--------------+------------------+
only showing top 20 rows



Contient Wise Death By Day  

In [152]:
continent_wise_death_by_day = spark.read.option("header","true")\
     .option("delimiter",",")\
     .csv(r"file:///C:\Users\rahul\Desktop\BridgeLabz\Python\Questions\covid-kaggle-dataset\worldometer_data.csv")

In [153]:
continent_wise_death_by_day.createOrReplaceTempView("Continent_Wise_Death_By_Day")
continent_wise_death_by_day.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: string (nullable = true)
 |-- TotalCases: string (nullable = true)
 |-- NewCases: string (nullable = true)
 |-- TotalDeaths: string (nullable = true)
 |-- NewDeaths: string (nullable = true)
 |-- TotalRecovered: string (nullable = true)
 |-- NewRecovered: string (nullable = true)
 |-- ActiveCases: string (nullable = true)
 |-- Serious,Critical: string (nullable = true)
 |-- Tot Cases/1M pop: string (nullable = true)
 |-- Deaths/1M pop: string (nullable = true)
 |-- TotalTests: string (nullable = true)
 |-- Tests/1M pop: string (nullable = true)
 |-- WHO Region: string (nullable = true)



In [161]:
continent_wise_death_by_day = """
SELECT 
    w.Continent,
    ROUND(SUM(CAST(c.Deaths AS BIGINT)) / COUNT(DISTINCT c.Date),2) AS avg_deaths_per_day
FROM 
    Country_Wise_Death_By_Day c
JOIN 
    Continent_Wise_Death_By_Day w
ON 
    c.`Country/Region` = w.`Country/Region`
GROUP BY 
    w.Continent
ORDER BY 
    w.Continent;
"""

spark.sql(continent_wise_death_by_day).show()


+-----------------+------------------+
|        Continent|avg_deaths_per_day|
+-----------------+------------------+
|           Africa|           3628.02|
|             Asia|          20654.07|
|Australia/Oceania|             72.18|
|           Europe|          78134.58|
|    North America|          14181.08|
|    South America|          30222.54|
+-----------------+------------------+



6. Average of cases divided by the number of population of each country
(TOP 10)

In [155]:
average_death_country_wise = spark.read.option("header","true")\
     .option("delimiter",",")\
     .csv(r"file:///C:\Users\rahul\Desktop\BridgeLabz\Python\Questions\covid-kaggle-dataset\worldometer_data.csv")

In [156]:
average_death_country_wise.createOrReplaceTempView("average_death")
average_death_country_wise.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: string (nullable = true)
 |-- TotalCases: string (nullable = true)
 |-- NewCases: string (nullable = true)
 |-- TotalDeaths: string (nullable = true)
 |-- NewDeaths: string (nullable = true)
 |-- TotalRecovered: string (nullable = true)
 |-- NewRecovered: string (nullable = true)
 |-- ActiveCases: string (nullable = true)
 |-- Serious,Critical: string (nullable = true)
 |-- Tot Cases/1M pop: string (nullable = true)
 |-- Deaths/1M pop: string (nullable = true)
 |-- TotalTests: string (nullable = true)
 |-- Tests/1M pop: string (nullable = true)
 |-- WHO Region: string (nullable = true)



In [168]:
cases_per_population_query = """
SELECT 
    `Country/Region`, 
    ROUND(SUM(CAST(TotalCases AS float)) / NULLIF(SUM(CAST(Population AS float)), 0)*100, 2) AS cases_per_population
FROM 
    average_death
GROUP BY 
    `Country/Region`
ORDER BY 
    cases_per_population DESC
LIMIT 10;
"""

spark.sql(cases_per_population_query).show()


+--------------+--------------------+
|Country/Region|cases_per_population|
+--------------+--------------------+
|         Qatar|                3.99|
| French Guiana|                2.71|
|       Bahrain|                2.51|
|    San Marino|                2.06|
|         Chile|                1.92|
|        Panama|                1.65|
|        Kuwait|                1.64|
|          Oman|                1.58|
|           USA|                1.52|
|  Vatican City|                 1.5|
+--------------+--------------------+

