"""

@Author: Naveen Madev Naik<br>
@Date: 2024-08-08<br>
@Last Modified by: Naveen Madev Naik<br>
@Last Modified time: 2024-08-08<br>
@Title: sql operation on covid dataset using pyspark<br>

"""

In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder\
        .appName("Covid problem using pyspark sql")\
        .getOrCreate()


## 1.To find out the death percentage locally and globally

### Death Percentage Globally

In [19]:
from pyspark.sql.functions import col

In [32]:
file_path = r"file:///E:/Documents/DataSet/covid/country_wise_latest.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.select("Country/Region")
df.printSchema()
df.createOrReplaceTempView("country")
result_globally = spark.sql(''' select `Country/Region`,round(deaths*100.00/confirmed,2) as death_percentage from country ''')
result_globally.show()

root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)
 |-- Deaths / 100 Cases: double (nullable = true)
 |-- Recovered / 100 Cases: double (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: integer (nullable = true)
 |-- 1 week change: integer (nullable = true)
 |-- 1 week % increase: double (nullable = true)
 |-- WHO Region: string (nullable = true)

+-------------------+----------------+
|     Country/Region|death_percentage|
+-------------------+----------------+
|        Afghanistan|            3.50|
|            Albania|            2.95|
|            Algeria|            4.16|
|            Andorra|            5.73|
|             Angola|            4.3

### Death Percentage Locally

In [34]:
result_locally= spark.sql(''' select `Country/Region`,round(deaths*100.00/confirmed,2) as death_percentage from country where `Country/Region`='India' ''')
result_locally.show()

+--------------+----------------+
|Country/Region|death_percentage|
+--------------+----------------+
|         India|            2.26|
+--------------+----------------+



## 2.To find out the infected population percentage locally and globally

### Global infected population percentage

In [38]:
file_path_worldmeter = r"file:///E:/Documents/DataSet/covid/worldometer_data.csv"
df = spark.read.csv(file_path_worldmeter, header=True, inferSchema=True)
df.printSchema()
df.createOrReplaceTempView("Worldometer")
result_globally = spark.sql(''' select `Country/Region`,round((Totalcases*100.0/Population),4)  as infectedPopulationPercentage from Worldometer ''')
result_globally.show()
 


root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- TotalCases: integer (nullable = true)
 |-- NewCases: integer (nullable = true)
 |-- TotalDeaths: integer (nullable = true)
 |-- NewDeaths: integer (nullable = true)
 |-- TotalRecovered: integer (nullable = true)
 |-- NewRecovered: integer (nullable = true)
 |-- ActiveCases: integer (nullable = true)
 |-- Serious,Critical: integer (nullable = true)
 |-- Tot Cases/1M pop: integer (nullable = true)
 |-- Deaths/1M pop: double (nullable = true)
 |-- TotalTests: integer (nullable = true)
 |-- Tests/1M pop: integer (nullable = true)
 |-- WHO Region: string (nullable = true)

+--------------+----------------------------+
|Country/Region|infectedPopulationPercentage|
+--------------+----------------------------+
|           USA|                      1.5194|
|        Brazil|                      1.3716|
|         India|                      0.1466|
|        

### Local infected population percentage

In [39]:
result_locally = spark.sql(''' select `Country/Region`,round((Totalcases*100.0/Population),4)  as infectedPopulationPercentage from Worldometer where `Country/Region`='India' ''')
result_locally.show()

+--------------+----------------------------+
|Country/Region|infectedPopulationPercentage|
+--------------+----------------------------+
|         India|                      0.1466|
+--------------+----------------------------+



## 3.To find out the countries with the highest infection rates

In [40]:
file_path = r"file:///E:/Documents/DataSet/covid/country_wise_latest.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.printSchema()
df.createOrReplaceTempView("CountryWiseList")
result = spark.sql(''' select `Country/Region`,Confirmed as highest_infection from CountryWiseList where Confirmed in (select max(Confirmed) from CountryWiseList ) ''')
result.show()

root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)
 |-- Deaths / 100 Cases: double (nullable = true)
 |-- Recovered / 100 Cases: double (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: integer (nullable = true)
 |-- 1 week change: integer (nullable = true)
 |-- 1 week % increase: double (nullable = true)
 |-- WHO Region: string (nullable = true)

+--------------+-----------------+
|Country/Region|highest_infection|
+--------------+-----------------+
|            US|          4290259|
+--------------+-----------------+



## 4. To find out the countries and continents with the highest death counts

### Continent with highest death count

In [52]:
result_globally = spark.sql(''' select `Continent`,sum(TotalDeaths) as totaldeath from Worldometer group by `Continent`  order by TotalDeath desc limit 1 ''')
result_globally.show()


+-------------+----------+
|    Continent|totaldeath|
+-------------+----------+
|North America|    229855|
+-------------+----------+



### Country with highest Death count

In [53]:
result_globally = spark.sql(''' select `Country/Region`,TotalDeaths  from Worldometer order by TotalDeaths desc limit 1 ''')
result_globally.show()

+--------------+-----------+
|Country/Region|TotalDeaths|
+--------------+-----------+
|           USA|     162804|
+--------------+-----------+



## 5. Average number of deaths by day (Continents and Countries) 

### Average number of deaths by day in a country

In [57]:
file_path_covid_clean = r"file:///E:/Documents/DataSet/covid/covid_19_clean_complete.csv"
df = spark.read.csv(file_path_covid_clean, header=True, inferSchema=True)
df.printSchema()
df.createOrReplaceTempView("CovidClean")
result = spark.sql(''' select `Country/Region`,sum(deaths)/count(`Country/Region`) as average_deaths_countries from CovidClean group by `Country/Region` ''')
result.show()


root
 |-- Province/State: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- WHO Region: string (nullable = true)

+--------------+------------------------+
|Country/Region|average_deaths_countries|
+--------------+------------------------+
|          Chad|      29.377659574468087|
|      Paraguay|       8.845744680851064|
|        Russia|       3294.601063829787|
|         Yemen|       94.18617021276596|
|       Senegal|       38.17553191489362|
|    Cabo Verde|       4.542553191489362|
|        Sweden|      2387.8351063829787|
|        Guyana|       7.159574468085107|
|       Eritrea|                     0.0|
|   Philippines|       589.8510638297872|
|         Burma|       3.398936170212766|
|    

### Average number of deaths by day in a Continent

In [58]:
result = spark.sql(''' select `WHO Region`,sum(deaths)/count(`WHO Region`) as average_deaths_continent from CovidClean group by `WHO Region` ''')
result.show()

+--------------------+------------------------+
|          WHO Region|average_deaths_continent|
+--------------------+------------------------+
|              Europe|      1281.3191489361702|
|     Western Pacific|       90.17698259187621|
|              Africa|       48.75642730496454|
|Eastern Mediterra...|       465.1907640232108|
|            Americas|       2238.586031452359|
|     South-East Asia|       775.6031914893617|
+--------------------+------------------------+



## 6. Average of cases divided by the number of population of each country (TOP 10)

In [59]:
result = spark.sql(''' select `Country/Region`,(TotalCases * 1.0 / Population) AS Cases_Per_Population FROM Worldometer order by Cases_Per_Population desc limit 10 ''')
result.show()

+--------------+--------------------+
|Country/Region|Cases_Per_Population|
+--------------+--------------------+
|         Qatar|      0.039921575750|
| French Guiana|      0.027145648580|
|       Bahrain|      0.025130239080|
|    San Marino|      0.020596381637|
|         Chile|      0.019164810228|
|        Panama|      0.016527039892|
|        Kuwait|      0.016378443168|
|          Oman|      0.015769043964|
|           USA|      0.015193862961|
|  Vatican City|      0.014981273408|
+--------------+--------------------+



## 7. Considering the highest value of total cases, which countries have the highest rate of infection in relation to population? 

In [61]:
result = spark.sql(''' select `Country/Region`,TotalDeaths,(TotalCases * 1.0 / Population) AS Infection_Rate FROM Worldometer order by TotalDeaths desc limit 1''')
result.show()

+--------------+-----------+--------------+
|Country/Region|TotalDeaths|Infection_Rate|
+--------------+-----------+--------------+
|           USA|     162804|0.015193862961|
+--------------+-----------+--------------+



## Using JOINS

## 1. To find out the population vs the number of people vaccinated

In [66]:
worldometer_data_df = spark.read.csv('file:///E:/Documents/DataSet/covid/worldometer_data.csv', header=True, inferSchema=True)
covid_vaccine_statewise_df = spark.read.csv('file:///E:/Documents/DataSet/covid/covid_vaccine_statewise.csv', header=True, inferSchema=True)
worldometer_data_df.printSchema()
covid_vaccine_statewise_df.printSchema()
worldometer_data_df.createOrReplaceTempView("worldometer_data")
covid_vaccine_statewise_df.createOrReplaceTempView("covid_vaccine_statewise")

root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- TotalCases: integer (nullable = true)
 |-- NewCases: integer (nullable = true)
 |-- TotalDeaths: integer (nullable = true)
 |-- NewDeaths: integer (nullable = true)
 |-- TotalRecovered: integer (nullable = true)
 |-- NewRecovered: integer (nullable = true)
 |-- ActiveCases: integer (nullable = true)
 |-- Serious,Critical: integer (nullable = true)
 |-- Tot Cases/1M pop: integer (nullable = true)
 |-- Deaths/1M pop: double (nullable = true)
 |-- TotalTests: integer (nullable = true)
 |-- Tests/1M pop: integer (nullable = true)
 |-- WHO Region: string (nullable = true)

root
 |-- Updated On: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Total Doses Administered: double (nullable = true)
 |-- Sessions: double (nullable = true)
 |--  Sites : double (nullable = true)
 |-- First Dose Administered: double (nullable = true)
 |-- Second

In [78]:
result = spark.sql('''
    SELECT ROUND(
        (cv.`Total Doses Administered`* 100.0)/wd.MaxPopulation, 
        2
    ) AS percentage
    FROM 
        (SELECT MAX(Population) AS MaxPopulation
         FROM worldometer_data 
         WHERE `Continent`= 'Asia') wd
    JOIN 
        (SELECT `Total Doses Administered`
         FROM covid_vaccine_statewise 
         WHERE `State`= 'India' 
         AND `Updated On` = (
             SELECT MAX(`Updated On`)
             FROM covid_vaccine_statewise 
             WHERE `State`= 'India' 
             AND `Total Doses Administered` IS NOT NULL
         )
         AND `Total Doses Administered` IS NOT NULL) cv
    ON 1=1
''')
result.show()

+----------+
|percentage|
+----------+
|     33.53|
+----------+



## 2. To find out the percentage of different vaccine taken by people in a country

In [84]:
covid_vaccine_statewise_df = spark.read.csv('file:///E:/Documents/DataSet/covid/covid_vaccine_statewise.csv', header=True, inferSchema=True)
covid_vaccine_statewise_df.printSchema()
covid_vaccine_statewise_df.createOrReplaceTempView("covid_vaccine_statewise")

root
 |-- Updated On: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Total Doses Administered: double (nullable = true)
 |-- Sessions: double (nullable = true)
 |--  Sites : double (nullable = true)
 |-- First Dose Administered: double (nullable = true)
 |-- Second Dose Administered: double (nullable = true)
 |-- Male (Doses Administered): double (nullable = true)
 |-- Female (Doses Administered): double (nullable = true)
 |-- Transgender (Doses Administered): double (nullable = true)
 |--  Covaxin (Doses Administered): double (nullable = true)
 |-- CoviShield (Doses Administered): double (nullable = true)
 |-- Sputnik V (Doses Administered): double (nullable = true)
 |-- AEFI: double (nullable = true)
 |-- 18-44 Years (Doses Administered): double (nullable = true)
 |-- 45-60 Years (Doses Administered): double (nullable = true)
 |-- 60+ Years (Doses Administered): double (nullable = true)
 |-- 18-44 Years(Individuals Vaccinated): double (nullable = true)
 |-- 45-60 

In [88]:
result = spark.sql('''
    SELECT
        `Updated On` AS Date,
        `State`,
        ` Covaxin (Doses Administered)`* 100.0 /`Total Doses Administered` AS Covaxin_Percentage,
        `CoviShield (Doses Administered)`* 100.0 /`Total Doses Administered` AS CoviShield_Percentage,
        `Sputnik V (Doses Administered)`* 100.0 /`Total Doses Administered` AS Sputnik_V_Percentage
    FROM 
        covid_vaccine_statewise
    ORDER BY Date ASC
''')
result.show()

+----------+--------------------+-------------------+---------------------+--------------------+
|      Date|               State| Covaxin_Percentage|CoviShield_Percentage|Sputnik_V_Percentage|
+----------+--------------------+-------------------+---------------------+--------------------+
|01/02/2021|Dadra and Nagar H...|                0.0|                100.0|                NULL|
|01/02/2021|              Ladakh|                0.0|                100.0|                NULL|
|01/02/2021|Andaman and Nicob...|                0.0|                100.0|                NULL|
|01/02/2021|      Andhra Pradesh|  5.930021999072148|    94.06997800092785|                NULL|
|01/02/2021|   Arunachal Pradesh|                0.0|                100.0|                NULL|
|01/02/2021|               India|  2.070642173081831|    97.92935782691816|                NULL|
|01/02/2021|               Bihar| 2.3935931512996653|    97.60640684870033|                NULL|
|01/02/2021|          Chandiga

## 3. To find out percentage of people who took both the doses

In [93]:
result = spark.sql('''
    SELECT
        covid_1.`Updated On` AS Date,
        covid_1.State,
        COALESCE(covid_2.`Second Dose Administered`, 0) * 100.0 / COALESCE(covid_1.`First Dose Administered`, 1) AS Percentage_Both_Doses
    FROM 
        covid_vaccine_statewise covid_1
    JOIN 
        covid_vaccine_statewise covid_2
    ON 
        covid_1.State = covid_2.State
        AND covid_1.`Updated On` = covid_2.`Updated On` 
    ORDER BY 
        covid_1.`Updated On` ASC
''')

result.show()

+----------+--------------------+---------------------+
|      Date|               State|Percentage_Both_Doses|
+----------+--------------------+---------------------+
|01/02/2021|Dadra and Nagar H...|                  0.0|
|01/02/2021|              Ladakh|                  0.0|
|01/02/2021|Andaman and Nicob...|                  0.0|
|01/02/2021|      Andhra Pradesh|                  0.0|
|01/02/2021|   Arunachal Pradesh|                  0.0|
|01/02/2021|               India|                  0.0|
|01/02/2021|               Bihar|                  0.0|
|01/02/2021|          Chandigarh|                  0.0|
|01/02/2021|        Chhattisgarh|                  0.0|
|01/02/2021|               Assam|                  0.0|
|01/02/2021|               Delhi|                  0.0|
|01/02/2021|                 Goa|                  0.0|
|01/02/2021|             Gujarat|                  0.0|
|01/02/2021|             Haryana|                  0.0|
|01/02/2021|    Himachal Pradesh|               