In [1]:
import findspark
findspark.init('/usr/local/spark-3.1.2-bin-hadoop3.2')
from pyspark.sql import SparkSession

In [2]:
import pycountry
from resources.update_local_data import get_covid_data, rearrenge_data

In [3]:
spark = SparkSession.builder.appName("covid_deaths").getOrCreate()

21/07/08 13:28:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Update covid local data and convert the data to parquet and jsonl

In [4]:
get_covid_data()
rearrenge_data()

[INFO] url read -> https://covid.ourworldindata.org/data/owid-covid-data.csv
[INFO] data/covid_deaths.csv saved
[INFO] data/covid_vaccinations.csv saved


21/07/08 13:29:03 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , iso_code, continent, location, date, total_cases, new_cases, new_cases_smoothed, total_deaths, new_deaths, new_deaths_smoothed, total_cases_per_million, new_cases_per_million, new_cases_smoothed_per_million, total_deaths_per_million, new_deaths_per_million, new_deaths_smoothed_per_million, reproduction_rate, icu_patients, icu_patients_per_million, hosp_patients, hosp_patients_per_million, weekly_icu_admissions, weekly_icu_admissions_per_million, weekly_hosp_admissions, weekly_hosp_admissions_per_million, population
 Schema: _c0, iso_code, continent, location, date, total_cases, new_cases, new_cases_smoothed, total_deaths, new_deaths, new_deaths_smoothed, total_cases_per_million, new_cases_per_million, new_cases_smoothed_per_million, total_deaths_per_million, new_deaths_per_million, new_deaths_smoothed_per_million, reproduction_rate, icu_patients, icu_patients_per_million, hosp_patients, hosp_

[INFO] jsonl.gz files created:
        data/covid_deaths.jsonl.gz
        data/covid_vaccinations.jsonl.gz
        data/world_population_density.jsonl.gz


21/07/08 13:29:09 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
21/07/08 13:29:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , iso_code, continent, location, date, total_cases, new_cases, new_cases_smoothed, total_deaths, new_deaths, new_deaths_smoothed, total_cases_per_million, new_cases_per_million, new_cases_smoothed_per_million, total_deaths_per_million, new_deaths_per_million, new_deaths_smoothed_per_million, reproduction_rate, icu_patients, icu_patients_per_million, hosp_patients, hosp_patients_per_million, weekly_icu_admissions, weekly_icu_admissions_per_million, weekly_hosp_admissions, weekly_hosp_admissions_per_million, population
 Schema: _c0, iso_code, continent, location, date, total_cases, new_cases, new_cases_smoothed, total_deaths, new_deaths, new_deaths_smoothed, total_cases_per_million, new_cases_per_million, new_cases_smoo

[INFO] parquet files created: 
        data/covid_deaths.parquet
        data/covid_vaccinations.parquet
        data/world_population_density.parquet


# Covid-19 Dataset from Our World in Data
- main dataset source: https://ourworldindata.org/covid-deaths
- complement dataset #1, population density per country: https://worldpopulationreview.com/country-rankings/countries-by-density
- complement dataset #2,GDP evolution per country https://data.worldbank.org/indicator/NY.GDP.PCAP.CD

## Overview:
The present notebooks tends to explore and analyze covid deaths dataset from Our World in Data. It's going to take special attention to three countries, Soain, Italy and USA, while looking for some global numbers and doing a general analysis.

It can be noted that PySpark is going to be used for this analysis, actually it's going to be taken advantage of SQL syntax thank to `spark.sql()` for complex queries.

## Steps:
1. Load dataset, and check its schema, structure and scope,
2. Look at the most infected countries, where are they located (which continent),
3. Look at the countries with more deaths, where are they located (which continent),
4. Compare the previous results with Spain, Italy and USA,

### 1. Load dataset, and check its schema, structure and scope
- covid_deaths.parquet
- world_population_density.parquet
- gdp_evol_per_country.parquet

In [5]:
df_deaths = spark.read.parquet('../data/covid_deaths.parquet', inferSchema=True, header=True)
df_pop_density = spark.read.parquet('../data/world_population_density.parquet', inferSchema=True, header=True)

#### 1. covid_deaths.csv

In [6]:
df_deaths.createOrReplaceTempView("covid_deaths")
df_deaths.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = 

Continents that appear in the dataset

In [7]:
spark.sql("""
    SELECT DISTINCT continent
    FROM covid_deaths
""").show()

+-------------+
|    continent|
+-------------+
|       Europe|
|       Africa|
|         null|
|North America|
|South America|
|      Oceania|
|         Asia|
+-------------+



There are 231 countries from 6 different continents

In [8]:
spark.sql("""
    SELECT DISTINCT location
    FROM covid_deaths
""").count()

                                                                                

231

This is the range of the dataset. The records start on january 2020 and should end today since it's udtated everytime the function `update_local_data` run at the beginning of this notebook

In [9]:
spark.sql("""
    SELECT min(date(date)) as Start, max(date(date)) as End
    FROM covid_deaths
""").show()

+----------+----------+
|     Start|       End|
+----------+----------+
|2020-01-01|2021-07-07|
+----------+----------+



#### 2. world_population_density.csv

In [10]:
df_pop_density.createOrReplaceTempView("world_population_density")
df_pop_density.printSchema()

root
 |-- rank: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- density: double (nullable = true)
 |-- densityMi: double (nullable = true)
 |-- pop2021: double (nullable = true)
 |-- area: integer (nullable = true)



world_population_density.csv has 232 countries contemplated, one more than covid_deaths.csv

In [11]:
df_pop_density.select('country').count()

232

Now, let's compare these two tables or datasets:
- `covid_deaths.csv`
- `world_population_density.csv`

The idea is to enhance covid_deaths dataset by adding the population density per country. So let's compare which countries in `covid_deaths.csv` appear in `world_population_density.csv`.

For better result, it can be taken advantage of the iso_code standard, so that the language or any local convention can't affect when two countries name are compared (e.g. Czechia and Czech Republic refer to the same country).

In the case of `covid_deaths` table, it already have the iso_code as the second column, but the `world_population_density` just have the country name. Therefore, for this second case, it's going to be used `pycountry` (a pyhotn package available here: https://pypi.org/project/pycountry/) to decode the country name to effectively compare the two datasets.

More specifically, it's going to be used map transformation of `PySpark` that recieve a function as parameters to later applied it to each row in a dataframe. 

The following is a encoding function (`encode_country`) that recieve a row as parameter, take the two most important component (country and population density, the rest of elements are not going to be used), and return the iso_code of that country and the density as a row. Noted that has a try/except block to ensure that if a country can't be read by `pycountry` don't break, and instead return just `None`.

After checked what countries were not encode because `pycountry.country.search_fuzzy` didn't recognize them, it was tried to manually do it. To do that, it was created a map or python Dict to asign these countries with their respective iso code, and then update the `encode_country` function.

In [12]:
ISO_MAP = {
    'Macau': 'MAC',
    'South Korea': 'KOR',
    'United States Virgin Islands': 'VIR',
    'North Korea': 'PRK',
    'Cape Verde': 'CPV',
    'Ivory Coast': 'CIV',
    'DR Congo': 'COD',
    'Laos': 'LAO',
    'Sint Maarten': 'SXM',
    'Curacao': 'CUW'
}


# overwrited function 
def encode_country(row):
    country, iso_code, density = (row[1], ISO_MAP[row[1]], row[2]) \
                                    if row[1] in ISO_MAP.keys() \
                                    else (row[1], None, row[2])
    try:
        iso_code = pycountry.countries.search_fuzzy(country)[0].alpha_3 \
                    if iso_code is None else None
    except LookupError: 
        pass
    return (country, iso_code, density)

In [13]:
rdd = df_pop_density.rdd.map(encode_country)
df_pop_density = rdd.toDF(['country', 'iso_code', 'density'])

# Create a new tempView for the new table with encode countries
df_pop_density.createOrReplaceTempView("population_density_encoded")

                                                                                

Now, the two tables can be easily compared with each other by looking at their iso_codes. Noted that there are 10 iso_code in `covid_deaths` that are not in `population_density_encode`, and that the `WHERE continent IS NOT NULL` clause is added in the SQL query beacuse when it's not passed, continents as a whole are considered as country in the dataset.

In [14]:
spark.sql("""
    SELECT iso_code 
    FROM covid_deaths
    WHERE continent IS NOT NULL
    EXCEPT
    SELECT iso_code FROM population_density_encoded
""").count()

                                                                                

16

Here are the countries and the iso_code from `covid_deaths` that didn't find a iso_code in the `population_density_encoded` table (the new one). There are 10 countries as it was commented before. Therefore, it can be affirmed that for these 10 countries are not population density information available in the dataset consulted.

In [15]:
spark.sql("""
    SELECT DISTINCT location, iso_code
    FROM covid_deaths
    WHERE iso_code IN (
        SELECT iso_code FROM covid_deaths
        EXCEPT
        SELECT iso_code FROM population_density_encoded) AND
        continent IS NOT NULL 
""").show()

                                                                                

+--------------------+--------+
|            location|iso_code|
+--------------------+--------+
|               Macao|     MAC|
|     Northern Cyprus|OWID_CYN|
|             Curacao|     CUW|
|Democratic Republ...|     COD|
|               Niger|     NER|
|         South Korea|     KOR|
|       Cote d'Ivoire|     CIV|
|            Pitcairn|     PCN|
|Sint Maarten (Dut...|     SXM|
|              Jersey|     JEY|
|          Cape Verde|     CPV|
|Bonaire Sint Eust...|     BES|
|        Saint Helena|     SHN|
|              Kosovo|OWID_KOS|
|                Laos|     LAO|
|            Guernsey|     GGY|
+--------------------+--------+



### 2. Look at the most infected countries and where they are located, in which continent

The following are the top 20 countries that have the higher rate of infected people; 
noted that neither Spain nor Italy, two of the most affected european countries at the beginning of the pandemic, appear.

In [16]:
df_top_infected = spark.sql("""
    SELECT iso_code, location, continent, population,
        MAX(total_cases) as TotalCases, 
        (MAX(total_cases)/MAX(population))*100 as PercentPopulationInfected
    FROM covid_deaths
    GROUP BY iso_code, location, population, continent
    ORDER BY PercentPopulationInfected desc
    LIMIT 20
""")
df_top_infected.createOrReplaceTempView("top_infected_countries")
df_top_infected.show()



+--------+-------------+-------------+------------+-----------+-------------------------+
|iso_code|     location|    continent|  population| TotalCases|PercentPopulationInfected|
+--------+-------------+-------------+------------+-----------+-------------------------+
|     AND|      Andorra|       Europe|     77265.0|    14021.0|       18.146638193231087|
|     SYC|   Seychelles|       Africa|     98340.0|    16304.0|       16.579214968476712|
|     MNE|   Montenegro|       Europe|    628062.0|   100392.0|       15.984409182532936|
|     BHR|      Bahrain|         Asia|   1701583.0|   266797.0|       15.679340943110034|
|     CZE|      Czechia|       Europe| 1.0708982E7|  1668277.0|       15.578296797958945|
|     SMR|   San Marino|       Europe|     33938.0|     5092.0|       15.003830514467559|
|     MDV|     Maldives|         Asia|    540542.0|    74724.0|       13.823902675462776|
|     SVN|     Slovenia|       Europe|   2078932.0|   257550.0|       12.388572594004998|
|     LUX|

In [17]:
spark.sql("""
    SELECT tic.location, tic.PercentPopulationInfected, pde.density
    FROM top_infected_countries as tic
    INNER JOIN population_density_encoded as pde
        ON tic.iso_code = pde.iso_code
    ORDER BY 3 DESC
""").show()

[Stage 58:>                                                         (0 + 1) / 1]

+-------------+-------------------------+---------+
|     location|PercentPopulationInfected|  density|
+-------------+-------------------------+---------+
|      Bahrain|       15.679340943110034|2285.3542|
|     Maldives|       13.823902675462776|1812.0567|
|   San Marino|       15.003830514467559| 557.6557|
|       Israel|         9.75534631515234| 423.1957|
|  Netherlands|       10.062881703295963| 410.3488|
|      Belgium|         9.41441890740815| 381.0379|
|   Luxembourg|       11.461781282349483| 245.4811|
|   Seychelles|       16.579214968476712|  218.823|
|      Andorra|       18.146638193231087| 165.2885|
|      Czechia|       15.578296797958945| 135.9862|
|     Slovenia|       12.388572594004998| 102.5366|
|       Serbia|       10.539876283617719|   98.432|
|       Panama|        9.530663062301379|   58.098|
|   Montenegro|       15.984409182532936|  45.4715|
|    Lithuania|       10.248132914519426|  41.1924|
|United States|       10.202469468469236|    35.52|
|      Eston

                                                                                

The following table shows how many countries are per continent, in order to detect the most affected ones.

In [18]:
spark.sql("""
    SELECT continent, COUNT(continent), COUNT(continent)/20*100 as Percentage
    FROM top_infected_countries
    GROUP BY continent
    ORDER BY 2 desc
""").show()

+-------------+----------------+----------+
|    continent|count(continent)|Percentage|
+-------------+----------------+----------+
|       Europe|              12|      60.0|
|         Asia|               3|      15.0|
|South America|               2|      10.0|
|North America|               2|      10.0|
|       Africa|               1|       5.0|
+-------------+----------------+----------+





### 3. Look at the countries with more deaths and where they are located, in which continent

Now, let's have a look on the top 20 countries with the higher death rate

In [19]:
df_top_deaths = spark.sql("""
    SELECT location, continent, population, 
        MAX(total_cases) as TotalCases,
        MAX(total_deaths) as TotalDeaths,
        (MAX(total_deaths)/MAX(total_cases))*100 as PercentOfDeaths
    FROM covid_deaths
    GROUP BY location, population, continent
    ORDER BY PercentOfDeaths desc
    LIMIT 20
""")
df_top_deaths.createOrReplaceTempView("top_deaths_countries")
df_top_deaths.show()



+--------------------+-------------+-------------+----------+-----------+------------------+
|            location|    continent|   population|TotalCases|TotalDeaths|   PercentOfDeaths|
+--------------------+-------------+-------------+----------+-----------+------------------+
|             Vanuatu|      Oceania|     307150.0|       4.0|        1.0|              25.0|
|               Yemen|         Asia|  2.9825968E7|    6934.0|     1364.0|19.671185462936254|
|                Peru|South America|  3.2971846E7| 2071637.0|   193743.0| 9.352169323100524|
|              Mexico|North America| 1.28932753E8| 2558369.0|   234192.0| 9.153957071868835|
|               Sudan|       Africa|  4.3849269E7|   36805.0|     2760.0|7.4989811166961005|
|               Syria|         Asia|  1.7500657E7|   25735.0|     1893.0|7.3557412084709535|
|               Egypt|       Africa| 1.02334403E8|  282582.0|    16332.0| 5.779561330870331|
|             Somalia|       Africa|  1.5893219E7|   14995.0|      775



In [20]:
spark.sql("""
    SELECT continent, COUNT(continent), COUNT(continent)/20*100 as Percentage
    FROM top_deaths_countries
    WHERE TotalDeaths/TotalCases*100 >= 3.6423
    GROUP BY continent
    ORDER BY 2 desc
""").show()

+-------------+----------------+----------+
|    continent|count(continent)|Percentage|
+-------------+----------------+----------+
|       Africa|               6|      30.0|
|         Asia|               5|      25.0|
|South America|               3|      15.0|
|       Europe|               3|      15.0|
|      Oceania|               1|       5.0|
|North America|               1|       5.0|
+-------------+----------------+----------+





### 4. Compare the previous results with Spain, Italy and USA,

Just to verify that Spain and Italy are contemplated in the dataset, let's query them. Additionally, it can be interesting to compare their infection and death rate with the US rates.

- It can be noted that US have the higher infected rate (10.12%), but the lower death rate (1.79%).
- In contrats, Italy have the lower infected rate (7.03%) and the higher death rate (2.99%).
- Finally, Spain is in the middle of the two.

In [21]:
spark.sql("""
    SELECT DISTINCT location, 
        MAX(population) as Population,
        MAX(total_cases) as TotalCases, 
        MAX(total_deaths) as TotalDeaths,
        (MAX(total_cases)/MAX(population))*100 as PercentPopulationInfected,
        (MAX(total_deaths)/MAX(total_cases))*100 as PercentOfDeaths
    FROM covid_deaths
    WHERE location LIKE 'Spain' or 
        location LIKE 'Italy' or 
        location LIKE '%tates%'
    GROUP BY location
""").show()

+-------------+------------+-----------+-----------+-------------------------+------------------+
|     location|  Population| TotalCases|TotalDeaths|PercentPopulationInfected|   PercentOfDeaths|
+-------------+------------+-----------+-----------+-------------------------+------------------+
|United States|3.31002647E8|3.3770444E7|   606218.0|       10.202469468469236| 1.795114094443058|
|        Italy| 6.0461828E7|  4265714.0|   127718.0|       7.0552183767913865|2.9940591422678597|
|        Spain| 4.6754783E7|  3897996.0|    80969.0|        8.337106387596751| 2.077195564079594|
+-------------+------------+-----------+-----------+-------------------------+------------------+



Using spark.sql to query a spain dataframe

In [22]:
df_spain = spark.sql("""
    SELECT DATE(date),
        population, total_cases, new_cases, total_deaths, 
        (total_deaths/total_cases*100) as death_rate,
        (new_cases/population*100) as infected_rate
    FROM covid_deaths
    WHERE location = 'Spain' AND
        total_deaths IS NOT NULL
    ORDER by 1
""")
df_spain.createOrReplaceTempView("covid_deaths_spain")

This is how it looks like the spain dataframe, the first 20 days since is was reported the first death.

The following are some notes realted:
- The first covid death was on march (03-march-2020), there were 165 cases in total and 45 new cases.
- It's easy to see how the number of new cases and death rate are increasing.

In [23]:
df_spain.show()

+----------+-----------+-----------+---------+------------+------------------+--------------------+
|      date| population|total_cases|new_cases|total_deaths|        death_rate|       infected_rate|
+----------+-----------+-----------+---------+------------+------------------+--------------------+
|2020-03-03|4.6754783E7|      165.0|     45.0|         1.0|0.6060606060606061|9.624683746259715E-5|
|2020-03-04|4.6754783E7|      222.0|     57.0|         2.0|0.9009009009009009|1.219126607859563...|
|2020-03-05|4.6754783E7|      259.0|     37.0|         3.0|1.1583011583011582|7.913628858035766E-5|
|2020-03-06|4.6754783E7|      400.0|    141.0|         5.0|              1.25| 3.01573424049471E-4|
|2020-03-07|4.6754783E7|      500.0|    100.0|        10.0|               2.0|2.138818610279936...|
|2020-03-08|4.6754783E7|      673.0|    173.0|        17.0| 2.526002971768202| 3.70015619578429E-4|
|2020-03-09|4.6754783E7|     1073.0|    400.0|        28.0|  2.60950605778192|8.555274441119745E-4|


In [24]:
# spark.sql("""
#     SELECT MONTH(date) AS month, YEAR(date) AS year,
#         MAX(population) as population,
#         sum(total_cases) as total_cases, 
#         sum(new_cases) as new_cases, 
#         sum(total_deaths) as total_deaths,
#         (sum(total_deaths)/sum(total_cases)*100) as death_rate,
#         (sum(total_cases)/MAX(population)*100) as infected_rate
#     FROM covid_deaths_spain
#     GROUP BY month, year
#     ORDER by year, month
# """).show()