"""<br>
    @Author: Deven Gupta<br>
    @Date: 3-09-2024<br>
    @Last Modified by: Deven Gupta<br>
    @Last Modified time: 3-09-2024<br>
    @Title : Covid dataset Problem using pyspark sql<br>
<br>
"""<br>

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('SQL_Pyspark').master("local[*]").getOrCreate()

In [2]:
spark

In [11]:
#Importing/Reading CSV Files

df_country_wise_latest = spark.read.csv("file:///D:/BridgeLabz DE/covid-kaggle-dataset/country_wise_latest.csv",header=True,inferSchema=True)
df_worldometer_data = spark.read.csv("file:///D:/BridgeLabz DE/covid-kaggle-dataset/worldometer_data.csv",header=True,inferSchema=True)
df_covid_19_clean_complete = spark.read.csv("file:///D:/BridgeLabz DE/covid-kaggle-dataset/covid_19_clean_complete.csv",header=True,inferSchema=True)

In [12]:
#Create Temporary View

df_country_wise_latest.createOrReplaceTempView("country_wise_latest")
df_worldometer_data.createOrReplaceTempView("worldometer_data")
df_covid_19_clean_complete.createOrReplaceTempView("covid_19_clean_complete")

In [17]:
#Display Tables

table1 = spark.sql("""
    SELECT * FROM country_wise_latest LIMIT 1;
""")
table2 = spark.sql("""
    SELECT * FROM worldometer_data LIMIT 1;
""")
table3 = spark.sql("""
    SELECT * FROM covid_19_clean_complete LIMIT 1;
""")
print("1st Table")
table1.show()
print("\n\n2nd Table ")
table2.show()
print("\n\n3rd Table")
table3.show()

1st Table
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|   Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
+--------------+---------+------+---------+------+---------+----------+-------------+-------------

## 1. To find out the death percentage locally and globally

In [20]:
# Perform SQL queries using Spark SQL

local_death_percentage = spark.sql("""                                
    SELECT 
    `Country/Region` AS Country,
    (SUM(cast(Deaths as int)) * 100.0 / SUM(cast(Confirmed as int))) AS local_death_percentage
    FROM country_wise_latest
    GROUP BY `Country/Region` 
    HAVING `Country/Region`='India';
"""
)

global_death_percentage = spark.sql("""
    SELECT 
    (SUM(cast(Deaths as int)) * 100.0 / SUM(cast(Confirmed as int))) AS global_death_percentage
    FROM country_wise_latest;
"""
)

local_death_percentage.show()
global_death_percentage.show()


+-------+----------------------+
|Country|local_death_percentage|
+-------+----------------------+
|  India|      2.25718596312479|
+-------+----------------------+

+-----------------------+
|global_death_percentage|
+-----------------------+
|       3.96854825570971|
+-----------------------+



## 2. To find out the infected population percentage locally and globally

In [28]:

Local_Infected_Population_Percentage = spark.sql("""                                
    SELECT 
    `Country/Region` AS Country,
    (SUM(TotalCases) * 100.0 / SUM(Population)) AS Local_Infected_Population_Percentage
    FROM worldometer_data
    GROUP BY `Country/Region`
    HAVING `Country/Region`='India';
"""
)

Gobal_Infected_Population_Percentage = spark.sql("""
    SELECT 
    SUM(TotalCases) AS Total_Case,
    SUM(cast(Population AS BIGINT)) AS Total_Population,
    (SUM(TotalCases) * 100.0 / SUM(cast(Population AS BIGINT))) AS Gobal_Infected_Population_Percentage
    FROM worldometer_data;
"""
)

Local_Infected_Population_Percentage.show()
Gobal_Infected_Population_Percentage.show()

+-------+------------------------------------+
|Country|Local_Infected_Population_Percentage|
+-------+------------------------------------+
|  India|                    0.14662586134519|
+-------+------------------------------------+

+----------+----------------+------------------------------------+
|Total_Case|Total_Population|Gobal_Infected_Population_Percentage|
+----------+----------------+------------------------------------+
|  19169166|      6326421290|                    0.30300173069884|
+----------+----------------+------------------------------------+



## 3. To find out the countries with the highest infection rates

In [29]:
Highest_Infection_Rate = spark.sql("""
    SELECT 
    `Country/Region` AS Country,
    (SUM(TotalCases) * 100.0 / SUM(Population)) AS Highest_Infection_Rate
    FROM worldometer_data
    GROUP BY `Country/Region` 
    ORDER BY Highest_Infection_Rate DESC;
"""
)

Highest_Infection_Rate.show()

+-------------+----------------------+
|      Country|Highest_Infection_Rate|
+-------------+----------------------+
|        Qatar|      3.99215757504528|
|French Guiana|      2.71456485795882|
|      Bahrain|      2.51302390797513|
|   San Marino|      2.05963816371030|
|        Chile|      1.91648102282847|
|       Panama|      1.65270398923282|
|       Kuwait|      1.63784431675388|
|         Oman|      1.57690439637343|
|          USA|      1.51938629605185|
| Vatican City|      1.49812734082397|
|         Peru|      1.37934516564369|
|       Brazil|      1.37161041251279|
|      Armenia|      1.34350672158245|
|      Andorra|      1.22156370506483|
|   Luxembourg|      1.12815654148962|
|      Mayotte|      1.11257813100041|
|    Singapore|      0.93177854157828|
| South Africa|      0.90631493281939|
|       Israel|      0.86499833108456|
|     Maldives|      0.86434893101461|
+-------------+----------------------+
only showing top 20 rows



## 4. To find out the countries and continents with the highest death counts

In [33]:
Country_Highest_Death = spark.sql("""
    SELECT 
    `Country/Region` AS Country,
    TotalDeaths
    FROM worldometer_data
    ORDER BY TotalDeaths DESC
    LIMIT 5;
"""
)

Total_Continent_Deaths = spark.sql("""
   SELECT
    Continent,
    SUM(TotalDeaths) AS Total_Continent_Deaths
    FROM worldometer_data
    GROUP BY Continent
    ORDER BY Total_Continent_Deaths DESC
    LIMIT 5;
"""
)

Country_Highest_Death.show()
Total_Continent_Deaths.show()

+-------+-----------+
|Country|TotalDeaths|
+-------+-----------+
|    USA|     162804|
| Brazil|      98644|
| Mexico|      50517|
|     UK|      46413|
|  India|      41638|
+-------+-----------+

+-------------+----------------------+
|    Continent|Total_Continent_Deaths|
+-------------+----------------------+
|North America|                229855|
|       Europe|                205232|
|South America|                154885|
|         Asia|                100627|
|       Africa|                 22114|
+-------------+----------------------+



## 5. Average number of deaths by day (Continents and Countries)

In [39]:
Country_Death = spark.sql("""
    SELECT 
    `Country/Region`,Date,
	AVG(Deaths) AS DEATHS
    FROM covid_19_clean_complete
    GROUP BY `Country/Region`,Date;
"""
)

Continent_Death = spark.sql("""
    SELECT 
    `WHO Region` AS Continent,Date,
	AVG(Deaths) AS DEATHS
    FROM covid_19_clean_complete
    GROUP BY `WHO Region`,Date;
"""
)

Country_Death.show()
Continent_Death.show()

+--------------------+----------+------+
|      Country/Region|      Date|DEATHS|
+--------------------+----------+------+
|                Iraq|2020-01-23|   0.0|
|             Comoros|2020-01-27|   0.0|
|               Niger|2020-01-31|   0.0|
|    Papua New Guinea|2020-01-31|   0.0|
|  Dominican Republic|2020-02-02|   0.0|
|             Albania|2020-02-03|   0.0|
|Central African R...|2020-02-03|   0.0|
|               Ghana|2020-02-03|   0.0|
|              Malawi|2020-02-04|   0.0|
|         Switzerland|2020-02-05|   0.0|
|          Cabo Verde|2020-02-06|   0.0|
|           Lithuania|2020-02-06|   0.0|
|             Comoros|2020-02-09|   0.0|
|             Czechia|2020-02-10|   0.0|
|             Liberia|2020-02-11|   0.0|
|          Azerbaijan|2020-02-12|   0.0|
|          Madagascar|2020-02-13|   0.0|
|         Timor-Leste|2020-02-13|   0.0|
|               Egypt|2020-02-14|   0.0|
|        Sierra Leone|2020-02-14|   0.0|
+--------------------+----------+------+
only showing top

## 6. Average of cases divided by the number of population of each country (TOP 10)

In [45]:
avg = spark.sql("""
    SELECT
    `Country/Region`,
	AVG(CAST(TotalCases AS float)/Population) AS DEATHS
    FROM worldometer_data
    GROUP BY `Country/Region`
    ORDER BY RAND()
    LIMIT 10;
"""
)

avg.show()

+--------------+--------------------+
|Country/Region|              DEATHS|
+--------------+--------------------+
|      S. Korea|2.831664369584020...|
|    Uzbekistan|8.448197037196562E-4|
| Liechtenstein| 0.00233356931225255|
|   Switzerland|0.004169056704159081|
|        Norway|0.001745101945987...|
|       Bahrain| 0.02513023907975126|
|           USA|0.015193862960518527|
|       Denmark|0.002468987081913...|
|       Belgium|0.006137093728457364|
|       Senegal|6.384103029353706E-4|
+--------------+--------------------+



In [3]:
spark.stop()