"""<br>
@Author: Prayag Bhoir<br>
@Date: 3-09-2024<br>
@Last Modified by: Prayag Bhoir<br>
@Last Modified time: 3-09-2024<br>
@Title : Covid problems using pyspark sql<br>
"""

In [6]:
from pyspark.sql import SparkSession


In [7]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("WordCount") \
    .master("local[*]") \
    .getOrCreate()

In [20]:
# import csvs using spark
df_country_wise_latest = spark.read.csv("C:/Users/bhoir/OneDrive/Desktop/fwdcoviddataset/covid-kaggle-dataset/country_wise_latest.csv" ,header=True, inferSchema=True)
print(type(df_country_wise_latest))

df_worldometer_data = spark.read.csv("C:/Users/bhoir/OneDrive/Desktop/fwdcoviddataset/covid-kaggle-dataset/worldometer_data.csv" ,header=True, inferSchema=True)

df_covid_19_clean_complete = spark.read.csv("C:/Users/bhoir/OneDrive/Desktop/fwdcoviddataset/covid-kaggle-dataset/covid_19_clean_complete.csv" ,header=True, inferSchema=True)



<class 'pyspark.sql.dataframe.DataFrame'>


## 1. To find out the death percentage locally and globally

In [9]:
# Create view to use 
df_country_wise_latest.createOrReplaceTempView("country_wise_latest")

In [10]:
# Print table
temp = spark.sql("""
	SELECT * FROM country_wise_latest
	LIMIT(5)
                 """)
temp.show()

+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|   Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
|       Albania|     4880|   144|     2745|  1991|      117|         6|           63|              2.95|    

In [11]:
# Calculate local death percentage
local_death_percentage = spark.sql("""
SELECT 
  `Country/Region` AS Country,
  (SUM(CAST(Deaths AS INT)) * 100.0) / SUM(CAST(Confirmed AS INT)) AS local_death_percentage
FROM 
  country_wise_latest
GROUP BY
  `Country/Region`
HAVING
  `Country/Region` = 'India'
""")

# Calculate global death percentage
global_death_percentage = spark.sql("""
SELECT 
  (SUM(CAST(Deaths AS INT)) * 100.0) / SUM(CAST(Confirmed AS INT)) AS global_death_percentage
FROM 
  country_wise_latest
""")

local_death_percentage.show()
global_death_percentage.show()

+-------+----------------------+
|Country|local_death_percentage|
+-------+----------------------+
|  India|      2.25718596312479|
+-------+----------------------+

+-----------------------+
|global_death_percentage|
+-----------------------+
|       3.96854825570971|
+-----------------------+



## 2. To find out the infected population percentage locally and globally

In [12]:
# Create view to use
df_worldometer_data.createOrReplaceTempView("worldometer_data")

In [13]:
# Print table
temp = spark.sql("""
	SELECT * FROM worldometer_data
	LIMIT(5)
                 """)
temp.show()

+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------+
|Country/Region|    Continent|Population|TotalCases|NewCases|TotalDeaths|NewDeaths|TotalRecovered|NewRecovered|ActiveCases|Serious,Critical|Tot Cases/1M pop|Deaths/1M pop|TotalTests|Tests/1M pop|    WHO Region|
+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------+
|           USA|North America| 331198130|   5032179|    null|     162804|     null|       2576668|        null|    2292707|           18296|           15194|        492.0|  63139605|      190640|      Americas|
|        Brazil|South America| 212710692|   2917562|    null|      98644|     null|       2047660|        null|     771258|            8318|           13716

In [17]:
# Calculate infected population percentage locally (per country)
infected_population_percentage_local = spark.sql(
"""
SELECT 
  `Country/Region`,
  (TotalCases * 100.0 / `Population`) AS Infected_rate
FROM 
  worldometer_data
WHERE
  `Country/Region` = 'India'
""")

# Calculate global infected population percentage
infected_population_percentage_global = spark.sql("""
SELECT
  SUM(TotalCases) AS TotalCase,
  SUM(CAST(`Population` AS BIGINT)) AS Population,
  (SUM(TotalCases) * 100.0 / SUM(CAST(`Population` AS BIGINT))) AS Global_infected_rate
FROM 
  worldometer_data
""")

infected_population_percentage_local.show()
infected_population_percentage_global.show()

+--------------+--------------+
|Country/Region| Infected_rate|
+--------------+--------------+
|         India|0.146625861345|
+--------------+--------------+

+---------+----------+--------------------+
|TotalCase|Population|Global_infected_rate|
+---------+----------+--------------------+
| 19169166|6326421290|    0.30300173069884|
+---------+----------+--------------------+



## 3. To find out the countries with the highest infection rates

In [16]:
# Print table
temp = spark.sql("""
	SELECT * FROM worldometer_data
	LIMIT(5)
                 """)
temp.show()

+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------+
|Country/Region|    Continent|Population|TotalCases|NewCases|TotalDeaths|NewDeaths|TotalRecovered|NewRecovered|ActiveCases|Serious,Critical|Tot Cases/1M pop|Deaths/1M pop|TotalTests|Tests/1M pop|    WHO Region|
+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------+
|           USA|North America| 331198130|   5032179|    null|     162804|     null|       2576668|        null|    2292707|           18296|           15194|        492.0|  63139605|      190640|      Americas|
|        Brazil|South America| 212710692|   2917562|    null|      98644|     null|       2047660|        null|     771258|            8318|           13716

In [15]:
highest_infection_rates = spark.sql("""
SELECT 
  `Country/Region` AS country, 
  (SUM(TotalCases) * 100.0) / Population AS infection_rate
FROM 
  worldometer_data
GROUP BY 
  `Country/Region`, Population
ORDER BY 
  infection_rate DESC
""")

highest_infection_rates.show()

+-------------+--------------+
|      country|infection_rate|
+-------------+--------------+
|        Qatar|3.992157575045|
|French Guiana|2.714564857959|
|      Bahrain|2.513023907975|
|   San Marino|2.059638163710|
|        Chile|1.916481022828|
|       Panama|1.652703989233|
|       Kuwait|1.637844316754|
|         Oman|1.576904396373|
|          USA|1.519386296052|
| Vatican City|1.498127340824|
|         Peru|1.379345165644|
|       Brazil|1.371610412513|
|      Armenia|1.343506721582|
|      Andorra|1.221563705065|
|   Luxembourg|1.128156541490|
|      Mayotte|1.112578131000|
|    Singapore|0.931778541578|
| South Africa|0.906314932819|
|       Israel|0.864998331085|
|     Maldives|0.864348931015|
+-------------+--------------+
only showing top 20 rows



## 4. To find out the countries and continents with the highest death counts

In [18]:
# Print table
temp = spark.sql("""
	SELECT * FROM worldometer_data
	LIMIT(5)
                 """)
temp.show()

+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------+
|Country/Region|    Continent|Population|TotalCases|NewCases|TotalDeaths|NewDeaths|TotalRecovered|NewRecovered|ActiveCases|Serious,Critical|Tot Cases/1M pop|Deaths/1M pop|TotalTests|Tests/1M pop|    WHO Region|
+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------+
|           USA|North America| 331198130|   5032179|    null|     162804|     null|       2576668|        null|    2292707|           18296|           15194|        492.0|  63139605|      190640|      Americas|
|        Brazil|South America| 212710692|   2917562|    null|      98644|     null|       2047660|        null|     771258|            8318|           13716

In [19]:
# Countries with the highest death counts
highest_death_counts_countries = spark.sql(
"""
SELECT 
	`Country/Region` AS Countries,
  TotalDeaths AS Death_count
FROM 
  worldometer_data
ORDER BY 
  Death_count DESC;
""")

# Continents with the highest death counts
highest_death_counts_continents = spark.sql(
"""
SELECT 
	Continent,
  SUM(TotalDeaths) AS Death_count
FROM 
  worldometer_data
WHERE 
  Continent IS NOT NULL
GROUP BY
  Continent
ORDER BY 
  Death_count DESC;
""")

highest_death_counts_countries.show()
highest_death_counts_continents.show()


+------------+-----------+
|   Countries|Death_count|
+------------+-----------+
|         USA|     162804|
|      Brazil|      98644|
|      Mexico|      50517|
|          UK|      46413|
|       India|      41638|
|       Italy|      35187|
|      France|      30312|
|       Spain|      28500|
|        Peru|      20424|
|        Iran|      17976|
|      Russia|      14606|
|    Colombia|      11939|
|       Chile|       9889|
|     Belgium|       9859|
|South Africa|       9604|
|     Germany|       9252|
|      Canada|       8966|
| Netherlands|       6153|
|    Pakistan|       6035|
|     Ecuador|       5877|
+------------+-----------+
only showing top 20 rows

+-----------------+-----------+
|        Continent|Death_count|
+-----------------+-----------+
|    North America|     229855|
|           Europe|     205232|
|    South America|     154885|
|             Asia|     100627|
|           Africa|      22114|
|Australia/Oceania|        281|
+-----------------+-----------+



## 5. Average number of deaths by day (Continents and Countries)

In [21]:
# Create view to use
df_covid_19_clean_complete.createOrReplaceTempView("covid_19_clean_complete")

In [22]:
# Print table
temp = spark.sql("""
	SELECT * FROM covid_19_clean_complete
	LIMIT(5)
                 """)
temp.show()

+--------------+--------------+--------+---------+----------+---------+------+---------+------+--------------------+
|Province/State|Country/Region|     Lat|     Long|      Date|Confirmed|Deaths|Recovered|Active|          WHO Region|
+--------------+--------------+--------+---------+----------+---------+------+---------+------+--------------------+
|          null|   Afghanistan|33.93911|67.709953|2020-01-22|        0|     0|        0|     0|Eastern Mediterra...|
|          null|       Albania| 41.1533|  20.1683|2020-01-22|        0|     0|        0|     0|              Europe|
|          null|       Algeria| 28.0339|   1.6596|2020-01-22|        0|     0|        0|     0|              Africa|
|          null|       Andorra| 42.5063|   1.5218|2020-01-22|        0|     0|        0|     0|              Europe|
|          null|        Angola|-11.2027|  17.8739|2020-01-22|        0|     0|        0|     0|              Africa|
+--------------+--------------+--------+---------+----------+---

In [28]:
# Average deaths per day by country
average_deaths_per_day_country = spark.sql(
"""
SELECT 
	`Date`,
  AVG(Deaths) AS Average_Deaths
FROM 
  covid_19_clean_complete
GROUP BY
  `Date`, `Country/Region`
ORDER BY 
  Average_Deaths DESC;
""")

# Average deaths per day by continent
average_deaths_per_day_continent = spark.sql(
"""
SELECT 
	`WHO Region`,
  `Date`,
  AVG(Deaths) AS Average_Deaths
FROM 
  covid_19_clean_complete
GROUP BY
  `Date`, `WHO Region`
ORDER BY 
  Average_Deaths DESC;
""")

average_deaths_per_day_country.show()
average_deaths_per_day_continent.show()


+----------+--------------+
|      Date|Average_Deaths|
+----------+--------------+
|2020-07-27|      148011.0|
|2020-07-26|      146935.0|
|2020-07-25|      146465.0|
|2020-07-24|      145560.0|
|2020-07-23|      144430.0|
|2020-07-22|      143316.0|
|2020-07-21|      142121.0|
|2020-07-20|      141025.0|
|2020-07-19|      140534.0|
|2020-07-18|      140119.0|
|2020-07-17|      139266.0|
|2020-07-16|      138358.0|
|2020-07-15|      137415.0|
|2020-07-14|      136466.0|
|2020-07-13|      135566.0|
|2020-07-12|      135205.0|
|2020-07-11|      134777.0|
|2020-07-10|      134101.0|
|2020-07-09|      133290.0|
|2020-07-08|      132300.0|
+----------+--------------+
only showing top 20 rows

+----------+----------+-----------------+
|WHO Region|      Date|   Average_Deaths|
+----------+----------+-----------------+
|  Americas|2020-07-27|7450.695652173913|
|  Americas|2020-07-26|7373.413043478261|
|  Americas|2020-07-25|7322.108695652174|
|  Americas|2020-07-24|7260.173913043478|
|  Ameri

## 6. Average of cases divided by the number of population of each country (TOP 10)

In [34]:
average_cases_per_population = spark.sql(
"""
SELECT 
	`Country/Region` AS Country,	
  AVG((TotalCases * 1.0) / `Population`) AS Avg_Cases_By_Population
FROM 
  worldometer_data
GROUP BY
  `Country/Region`
LIMIT 10
""")

average_cases_per_population.show()


+----------------+-----------------------+
|         Country|Avg_Cases_By_Population|
+----------------+-----------------------+
|            Chad|     0.0000572019680000|
|          Russia|     0.0059742940920000|
|        Paraguay|     0.0008927207340000|
|           Macao|     0.0000707482240000|
|             CAR|     0.0009549890110000|
|           Yemen|     0.0000591563590000|
|Turks and Caicos|     0.0033274865870000|
|         Senegal|     0.0006384103030000|
|          Sweden|     0.0081110505510000|
|      Cabo Verde|     0.0049121331850000|
+----------------+-----------------------+

