# Assesing the Worldwide Progress of COVID-19 Vaccinations Using Spark
Individual Assignment - Spark

MBD - OCT 2020

Charlotte Dalenbrook

## 1. PySpark Environment Setup

In [2]:
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession


sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [60]:
#all imports
from IPython.display import display, Markdown
from pyspark.sql.functions import when, col, count, countDistinct, desc, first, lit, to_date, min, max, split, collect_set, explode, row_number, udf, last, DataFrame
from pyspark.sql.functions import sum as _sum
import pandas as pd
from pyspark.sql import Window, types
import sys
import numpy as np
from functools import reduce

## 2. Data Source and Spark DataFrame Setup

In [3]:
vaccinationDF = spark.read \
                 .option("inferSchema", "true") \
                 .option("header", "true") \
                 .csv("country_vaccinations.csv")

In [4]:
#see what the data frame looks like
head = vaccinationDF.limit(5)
pd.DataFrame(head.collect(), columns = head.columns)

Unnamed: 0,country,iso_code,date,total_vaccinations,people_vaccinated,people_fully_vaccinated,daily_vaccinations_raw,daily_vaccinations,total_vaccinations_per_hundred,people_vaccinated_per_hundred,people_fully_vaccinated_per_hundred,daily_vaccinations_per_million,vaccines,source_name,source_website
0,Algeria,DZA,2021-01-29,0.0,,,,,0.0,,,,Sputnik V,Ministry of Health,https://www.aps.dz/regions/116777-blida-covid-...
1,Algeria,DZA,2021-01-30,30.0,,,30.0,30.0,0.0,,,1.0,Sputnik V,Ministry of Health,https://www.aps.dz/regions/116777-blida-covid-...
2,Andorra,AND,2021-01-25,576.0,576.0,,,,0.75,0.75,,,Pfizer/BioNTech,Government of Andorra,https://www.govern.ad/comunicats/item/12379-se...
3,Andorra,AND,2021-01-26,,,,,66.0,,,,854.0,Pfizer/BioNTech,Government of Andorra,https://www.govern.ad/comunicats/item/12379-se...
4,Andorra,AND,2021-01-27,,,,,66.0,,,,854.0,Pfizer/BioNTech,Government of Andorra,https://www.govern.ad/comunicats/item/12379-se...


## 3. Dataset Metadata Analysis

### A. Display schema and size of the DataFrame

In [5]:
vaccinationDF.printSchema()
display(Markdown("This DataFrame has **%d rows**." % vaccinationDF.count()))

root
 |-- country: string (nullable = true)
 |-- iso_code: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_vaccinations: double (nullable = true)
 |-- people_vaccinated: double (nullable = true)
 |-- people_fully_vaccinated: double (nullable = true)
 |-- daily_vaccinations_raw: double (nullable = true)
 |-- daily_vaccinations: double (nullable = true)
 |-- total_vaccinations_per_hundred: double (nullable = true)
 |-- people_vaccinated_per_hundred: double (nullable = true)
 |-- people_fully_vaccinated_per_hundred: double (nullable = true)
 |-- daily_vaccinations_per_million: double (nullable = true)
 |-- vaccines: string (nullable = true)
 |-- source_name: string (nullable = true)
 |-- source_website: string (nullable = true)



This DataFrame has **2605 rows**.

### B. Get Two Random Row Samples from the Dataset

In [6]:
vaccinationDF.cache() # optimization to make the processing faster
vaccinationDF.sample(False, 0.2).take(2)

[Row(country='Algeria', iso_code='DZA', date='2021-01-29', total_vaccinations=0.0, people_vaccinated=None, people_fully_vaccinated=None, daily_vaccinations_raw=None, daily_vaccinations=None, total_vaccinations_per_hundred=0.0, people_vaccinated_per_hundred=None, people_fully_vaccinated_per_hundred=None, daily_vaccinations_per_million=None, vaccines='Sputnik V', source_name='Ministry of Health', source_website='https://www.aps.dz/regions/116777-blida-covid-19-trente-vaccines-au-matin-du-1er-jour-de-la-campagne'),
 Row(country='Andorra', iso_code='AND', date='2021-01-30', total_vaccinations=None, people_vaccinated=None, people_fully_vaccinated=None, daily_vaccinations_raw=None, daily_vaccinations=66.0, total_vaccinations_per_hundred=None, people_vaccinated_per_hundred=None, people_fully_vaccinated_per_hundred=None, daily_vaccinations_per_million=854.0, vaccines='Pfizer/BioNTech', source_name='Government of Andorra', source_website='https://www.govern.ad/comunicats/item/12379-se-supera-el

### C. Data Entities, Metrics and Dimensions

These are the main elements of the dataset:

* **Entities:** Country (main one which is measured - the facts), Date (dimension), Total Vaccinations
* **Metrics:** Total Vaccinations, People Vaccinated, People Fully Vaccinated, Daily Vaccinations ... 
* **Dimensions:** Country, Date, Daily Vaccinations, Vaccines (Vaccine type)

### D. Column categorization

The following could be potential column categorizations:

* **Information related columns:** *country*, *iso_code*, *date*, *source_name*, *source_website*
* **Vaccination Count related columns:** *total_vaccinations*, *people_vaccinated*, *people_fully_vaccinated*, *daily_vaccinations_raw*, *daily_vaccinations*, *total_vaccinations_per_hundred*, *people_vaccinated_per_hundred*, *people_fully_vaccinated_per_hundred*, *daily_vaccinations_per_million*
* **Type related columns:** *vaccines*

## 4. Column Groups Basic Profiling

### A. Information Related Columns

In [7]:
print ("Summary of Information Related Columns: ")

#change date column from string to date type
vaccinationDF.withColumn("date", to_date(vaccinationDF["date"], 'yyyy-MM-dd'))

#get the start date of the data
start_date = vaccinationDF.select(min("date")).first()
print("\nFirst Date: {}".format(start_date))

#get the end date of the data
end_date = vaccinationDF.select(max("date")).first()
print("\nMost Recent Date: {}".format(end_date))

#get the distinct value counts for info columns
print("\nChecking amount of distinct values in columns Country, ISO Code, Date, Source Name, and Source Website:")
vaccinationDF.select([countDistinct(c).alias(c) for c in ["country", "iso_code", "date", "source_name", "source_website"]]).show()


#get the most and least frequent occurences for info columns
print ("Most and least frequent occurrences for Country, Date, and Source Name columns:")
CountryDF = vaccinationDF.groupBy("country").agg(count(lit(1)).alias("Total"))
DateDF = vaccinationDF.groupBy("date").agg(count(lit(1)).alias("Total"))
SourceNameDF = vaccinationDF.groupBy("source_name").agg(count(lit(1)).alias("Total"))

leastFreqCountry    = CountryDF.orderBy(col("Total").asc()).first()
mostFreqCountry     = CountryDF.orderBy(col("Total").desc()).first()
leastFreqDate       = DateDF.orderBy(col("Total").asc()).first()
mostFreqDate        = DateDF.orderBy(col("Total").desc()).first()
leastFreqSourceName = SourceNameDF.orderBy(col("Total").asc()).first()
mostFreqSourceName  = SourceNameDF.orderBy(col("Total").desc()).first()

#display results nicely
display(Markdown("""
| %s | %s | %s | %s | %s | %s |
|----|----|----|----|----|----|
| %s | %s | %s | %s | %s | %s |
""" % ("leastFreqCountry", "mostFreqCountry", "leastFreqDate", "mostFreqDate", "leastFreqSourceName", "mostFreqSourceName",\
       "%s (%d occurrences)" % (leastFreqCountry["country"], leastFreqCountry["Total"]), \
       "%s (%d occurrences)" % (mostFreqCountry["country"], mostFreqCountry["Total"]), \
       "%s (%d occurrences)" % (leastFreqDate["date"], leastFreqDate["Total"]), \
       "%s (%d occurrences)" % (mostFreqDate["date"], mostFreqDate["Total"]), \
       "%s (%d occurrences)" % (leastFreqSourceName["source_name"], leastFreqSourceName["Total"]), \
       "%s (%d occurrences)" % (mostFreqSourceName["source_name"], mostFreqSourceName["Total"]))))


#check for nulls in info columns
print("Checking for nulls on columns Country, ISO Code, Date, Source Name, and Source Website:")
vaccinationDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["country", "iso_code", "date", "source_name", "source_website"]]).show()

Summary of Information Related Columns: 

First Date: Row(min(date)='2020-12-13')

Most Recent Date: Row(max(date)='2021-02-09')

Checking amount of distinct values in columns Country, ISO Code, Date, Source Name, and Source Website:
+-------+--------+----+-----------+--------------+
|country|iso_code|date|source_name|source_website|
+-------+--------+----+-----------+--------------+
|     80|      75|  59|         48|            76|
+-------+--------+----+-----------+--------------+

Most and least frequent occurrences for Country, Date, and Source Name columns:



| leastFreqCountry | mostFreqCountry | leastFreqDate | mostFreqDate | leastFreqSourceName | mostFreqSourceName |
|----|----|----|----|----|----|
| Saint Helena (1 occurrences) | Wales (58 occurrences) | 2020-12-14 (5 occurrences) | 2021-01-30 (69 occurrences) | Government of Guernsey (1 occurrences) | Ministry of Health (737 occurrences) |


Checking for nulls on columns Country, ISO Code, Date, Source Name, and Source Website:
+-------+--------+----+-----------+--------------+
|country|iso_code|date|source_name|source_website|
+-------+--------+----+-----------+--------------+
|      0|     241|   0|          0|             0|
+-------+--------+----+-----------+--------------+



### B. Vaccination Count Related Columns

In [8]:
#print summary statistics for vaccination count columns
print("Summary of Vaccination Count Related Columns: ")
summary_vacc_cols = vaccinationDF.select("total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "daily_vaccinations_raw", "daily_vaccinations", "total_vaccinations_per_hundred", "people_vaccinated_per_hundred", "people_fully_vaccinated_per_hundred", "daily_vaccinations_per_million").summary()
display(pd.DataFrame(summary_vacc_cols.collect(), columns = summary_vacc_cols.columns))

#check for count of null values per column
print("\nChecking for nulls Vaccination Count Columns:")
nulls_vacc_counts = vaccinationDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "daily_vaccinations_raw", "daily_vaccinations", "total_vaccinations_per_hundred", "people_vaccinated_per_hundred", "people_fully_vaccinated_per_hundred", "daily_vaccinations_per_million"]])
display(pd.DataFrame(nulls_vacc_counts.collect(), columns = nulls_vacc_counts.columns))

#check for count of distinct values per column
print("\nChecking for amount of distinct values on Vaccination Count Columns:")
distinct_vacc_counts = vaccinationDF.select([countDistinct(c).alias(c) for c in ["total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "daily_vaccinations_raw", "daily_vaccinations", "total_vaccinations_per_hundred", "people_vaccinated_per_hundred", "people_fully_vaccinated_per_hundred", "daily_vaccinations_per_million"]])
display(pd.DataFrame(distinct_vacc_counts.collect(), columns = distinct_vacc_counts.columns))

Summary of Vaccination Count Related Columns: 


Unnamed: 0,summary,total_vaccinations,people_vaccinated,people_fully_vaccinated,daily_vaccinations_raw,daily_vaccinations,total_vaccinations_per_hundred,people_vaccinated_per_hundred,people_fully_vaccinated_per_hundred,daily_vaccinations_per_million
0,count,1712.0,1388.0,845.0,1409.0,2502.0,1712.0,1388.0,845.0,2502.0
1,mean,1217798.1028037383,1035908.0489913544,277274.8852071006,68777.90276792052,57449.12909672262,4.732587616822436,4.26482708933717,1.1469349112426035,2018.8077537969623
2,stddev,3997054.566313126,3339963.526705656,957876.7330884596,191047.7842161158,175258.24304659406,9.006691234200698,7.307836608007226,3.2438301189404397,3575.173054841261
3,min,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0
4,25%,26758.0,24998.0,5237.0,2082.0,1328.0,0.48,0.49,0.07,346.0
5,50%,150752.0,138436.0,22472.0,10577.0,6467.0,1.82,1.92,0.37,947.0
6,75%,645105.0,556997.0,106259.0,54445.0,31246.0,4.03,3.34,0.88,1696.0
7,max,43206190.0,32867213.0,9840429.0,2218752.0,1916190.0,67.38,41.68,25.69,30869.0



Checking for nulls Vaccination Count Columns:


Unnamed: 0,total_vaccinations,people_vaccinated,people_fully_vaccinated,daily_vaccinations_raw,daily_vaccinations,total_vaccinations_per_hundred,people_vaccinated_per_hundred,people_fully_vaccinated_per_hundred,daily_vaccinations_per_million
0,893,1217,1760,1196,103,893,1217,1760,103



Checking for amount of distinct values on Vaccination Count Columns:


Unnamed: 0,total_vaccinations,people_vaccinated,people_fully_vaccinated,daily_vaccinations_raw,daily_vaccinations,total_vaccinations_per_hundred,people_vaccinated_per_hundred,people_fully_vaccinated_per_hundred,daily_vaccinations_per_million
0,1674,1361,800,1353,2065,735,617,220,1560


In [9]:
# create new df with ordered row numbers (according to date) per country
windowSpec = Window.partitionBy("country").orderBy("date")
vaccinationDF = vaccinationDF.withColumn("row_number",row_number().over(windowSpec))

In [10]:
# replace null with 0 for first row of country
df = vaccinationDF
newDf = df.withColumn("total_vaccinations", \
              when(((df["row_number"] == 1) & (df["total_vaccinations"].isNull())), 0)\
                      .otherwise(df["total_vaccinations"]))

# otherwise replace null with the previous good value
vaccinationDF_processed = newDf.withColumn("total_vaccinations", last('total_vaccinations', True)\
                                           .over(Window.partitionBy('country').orderBy('date')\
                                                 .rowsBetween(-sys.maxsize, 0)))

In [12]:
# replace nulls of people_fully_vaccinated 
# replace first row nulls with 0 
df = vaccinationDF_processed
newDf = df.withColumn("people_fully_vaccinated", \
              when(((df["row_number"] == 1) & (df["people_fully_vaccinated"].isNull())), 0)\
                      .otherwise(df["people_fully_vaccinated"]))

# otherwise replace null with the previous good value
vaccinationDF_processed = newDf.withColumn("people_fully_vaccinated", last("people_fully_vaccinated", True)\
                                           .over(Window.partitionBy('country').orderBy('date')\
                                                 .rowsBetween(-sys.maxsize, 0)))

In [13]:
# replace nulls of people_fully_vaccinated_per_hundred
df = vaccinationDF_processed
newDf = df.withColumn("people_fully_vaccinated_per_hundred", \
              when(((df["row_number"] == 1) & (df["people_fully_vaccinated_per_hundred"].isNull())), 0)\
                      .otherwise(df["people_fully_vaccinated_per_hundred"]))

# otherwise replace null with the previous good value
vaccinationDF_processed = newDf.withColumn("people_fully_vaccinated_per_hundred", last("people_fully_vaccinated_per_hundred", True)\
                                           .over(Window.partitionBy('country').orderBy('date')\
                                                 .rowsBetween(-sys.maxsize, 0)))

In [14]:
# impute nulls in daily vaccination column with 0's 
vaccinationDF_processed = vaccinationDF_processed\
                            .withColumn("daily_vaccinations", when(df["daily_vaccinations"].isNull(), 0)\
                            .otherwise(df["daily_vaccinations"]))

In [15]:
#check for count of null values per column that was manipulated
print("\nChecking for nulls Vaccination Count Columns:")
nulls_vacc_counts = vaccinationDF_processed.select([count(when(col(c).isNull(), c)).alias(c) for c in ["total_vaccinations", "people_fully_vaccinated", "daily_vaccinations", "people_fully_vaccinated_per_hundred"]])
display(pd.DataFrame(nulls_vacc_counts.collect(), columns = nulls_vacc_counts.columns))



Checking for nulls Vaccination Count Columns:


Unnamed: 0,total_vaccinations,people_fully_vaccinated,daily_vaccinations,people_fully_vaccinated_per_hundred
0,0,0,0,0


In [16]:
# remove other columns because there are too many nulls / can be computed again rather than imputing all null values
reduce(DataFrame.drop, ["daily_vaccinations_raw", "total_vaccinations_per_hundred", "people_vaccinated_per_hundred", "daily_vaccinations_per_million"], vaccinationDF_processed)


DataFrame[country: string, iso_code: string, date: string, total_vaccinations: double, people_vaccinated: double, people_fully_vaccinated: double, daily_vaccinations: double, people_fully_vaccinated_per_hundred: double, vaccines: string, source_name: string, source_website: string, row_number: int]

### C. Type Related Columns

In [17]:
#find unique combinations of vaccines
print("\nUnique Vaccine Combinations: ")
vaccines = vaccinationDF.select("vaccines").distinct().show(truncate = False)

#get most and least frequently used vaccines
vaccineDF = vaccinationDF.groupBy("vaccines").agg(count(lit(1)).alias("Total"))
leastFreqVacc = vaccineDF.orderBy(col("Total").asc()).first()
mostFreqVacc = vaccineDF.orderBy(col("Total").desc()).first()

#print results of most & least frequently used vaccines
print("Least Frequently Used Vaccine / Vaccine Combo: ")
print(leastFreqVacc["vaccines"])
print("Total Uses: {}".format(leastFreqVacc["Total"]))

print("\nMost Frequently Used Vaccine / Vaccine Combo: ")
print(mostFreqVacc["vaccines"])
print("Total Uses: {}".format(mostFreqVacc["Total"]))

#check for nulls
print("\nChecking for nulls Vaccination Count Columns:")
nulls_vacc_type = vaccinationDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["vaccines"]])
display(pd.DataFrame(nulls_vacc_type.collect(), columns = nulls_vacc_type.columns))


Unique Vaccine Combinations: 
+----------------------------------------------------------------------------------+
|vaccines                                                                          |
+----------------------------------------------------------------------------------+
|Oxford/AstraZeneca                                                                |
|Pfizer/BioNTech, Sinopharm/Beijing, Sputnik V                                     |
|Sinovac                                                                           |
|Oxford/AstraZeneca, Sinopharm/Beijing                                             |
|Sinopharm/Beijing, Sinopharm/Wuhan, Sinovac                                       |
|Moderna, Oxford/AstraZeneca, Pfizer/BioNTech                                      |
|Oxford/AstraZeneca, Pfizer/BioNTech, Sinopharm/Beijing, Sinopharm/Wuhan, Sputnik V|
|Sputnik V                                                                         |
|Moderna, Pfizer/BioNTech         

Unnamed: 0,vaccines
0,0


# Business Questions

1. In which countries has the vaccination program the most effective so far? And which countries are performing worst in vaccinating their population?

2. What is the per country and worldwide progress of the vaccinations?

3. Which vaccine type has been used the most?

4.	At this rate, when will 70% of the population be immune (herd immunity)?

Before beginning to answer the business questions, we will enrich the dataset with up to date information on the population of each country:

In [48]:
#import population dataset
populationDF = spark.read \
                 .option("inferSchema", "true") \
                 .option("header", "true") \
                 .csv("population_by_country_2020.csv")

In [49]:
#keep only 2 necessary columns
populationDF = populationDF.select(["Country (or dependency)", "Population (2020)"]).withColumnRenamed("Country (or dependency)", "country")
populationDF = populationDF.withColumnRenamed("Population (2020)", "population")

populationDF.limit(5).show()

+-------------+-------------+
|      country|   population|
+-------------+-------------+
|        China|1.440297825E9|
|        India|1.382345085E9|
|United States|  3.3134105E8|
|    Indonesia| 2.74021604E8|
|     Pakistan| 2.21612785E8|
+-------------+-------------+



In [50]:
#join population with vaccination df 
vaccinationDF_processed_with_pop = vaccinationDF_processed.join(populationDF, on=['country'], how='left')

In [51]:
vaccinationDF_processed_with_pop.where(col("population").isNull()).count()

0

As we can see, no population values have nulls, so we have good data to create more columns of information!

In [81]:
# create UDF to calculate percentage of people vaccinated 
def vac_percentage(people_fully_vaccinated, population):
    vaccinated_percentage = ((people_fully_vaccinated/population)*100)
    return vaccinated_percentage

In [78]:
# divide people fully vaccinated by population
vaccinationDF_percent_pop = vaccinationDF_processed_with_pop.withColumn("vaccinated_percentage", vac_percentage_udf(col("people_fully_vaccinated"), col("population")).cast("double"))

vaccinationDF_percent_pop.limit(5).show()

+-------+--------+----------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+---------+--------------------+--------------------+----------+------------+---------------------+
|country|iso_code|      date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million| vaccines|         source_name|      source_website|row_number|  population|vaccinated_percentage|
+-------+--------+----------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+---------+------------

### Question 1

In [82]:
# countries which have given the highest total number of vaccines (and lowest)
print("Top 10 Countries with Highest Total Vaccinations: ")
vaccinationDF_processed.groupBy("country").agg(max("total_vaccinations").alias("curr_total_vacc")).sort(desc("curr_total_vacc")).limit(10).show()

print("Top 10 Countries with Lowest Total Vaccinations: ")
vaccinationDF_processed.groupBy("country").agg(max("total_vaccinations").alias("curr_total_vacc")).sort(col("curr_total_vacc").asc()).limit(10).show()


# countries with highest amount of people fully vaccinated 
print("Top 10 Countries with Highest Amount of People Fully Vaccinated: ")
vaccinationDF_processed.groupBy("country").agg(max("people_fully_vaccinated").alias("curr_full_vacc")).sort(desc("curr_full_vacc")).limit(10).show()

print("Top 10 Countries with Lowest Amount of People Fully Vaccinationed: ")
vaccinationDF_processed.groupBy("country").agg(max("people_fully_vaccinated").alias("curr_full_vacc")).sort(col("curr_full_vacc").asc()).limit(10).show()


# country where number of vaccinations per hundred is highest (and lowest)
print("Top 10 Countries with Highest Total Vaccinations per 100: ")
vaccinationDF_processed.groupBy("country").agg(max("people_fully_vaccinated_per_hundred").alias("curr_total_vacc_per_100")).sort(desc("curr_total_vacc_per_100")).limit(10).show()

print("Top 10 Countries with Lowest Total Vaccinations per 100: ")
vaccinationDF_processed.groupBy("country").agg(max("people_fully_vaccinated_per_hundred").alias("curr_total_vacc_per_100")).sort(col("curr_total_vacc_per_100").asc()).limit(10).show()


# check that the vaccination % is similar to vaccinations per 100 
print("Top 10 Countries with Highest Total Vaccinations per from Combo with Population Dataset: ")
vaccinationDF_percent_pop.groupBy("country").agg(max("vaccinated_percentage").alias("curr_total_vacc_per_100")).sort(desc("curr_total_vacc_per_100")).limit(10).show()

print("Top 10 Countries with Lowest Total Vaccinations per from Combo with Population Dataset: ")
vaccinationDF_percent_pop.groupBy("country").agg(max("vaccinated_percentage").alias("curr_total_vacc_per_100")).sort(col("curr_total_vacc_per_100").asc()).limit(10).show()

Top 10 Countries with Highest Total Vaccinations: 
+--------------------+---------------+
|             country|curr_total_vacc|
+--------------------+---------------+
|       United States|     4.320619E7|
|               China|        4.052E7|
|      United Kingdom|    1.3162878E7|
|             England|    1.1245053E7|
|               India|      6611561.0|
|              Israel|      5831820.0|
|United Arab Emirates|      4527144.0|
|              Brazil|      3820207.0|
|             Germany|      3369433.0|
|              Turkey|      2732709.0|
+--------------------+---------------+

Top 10 Countries with Lowest Total Vaccinations: 
+--------------+---------------+
|       country|curr_total_vacc|
+--------------+---------------+
|       Algeria|           30.0|
|  Saint Helena|          107.0|
| Liechtenstein|          833.0|
|       Andorra|         1036.0|
|         Egypt|         1315.0|
|        Monaco|         2400.0|
|     Greenland|         2584.0|
|       Bolivia|      

### Question 2

In order to assess the worldwide progress of vaccinations we can divide the total number of fully vaccinated people by the worldwide population!

In [55]:
# total vaccinations (not fully)
total_vacc_by_country = vaccinationDF_processed.groupBy("country").agg(max("total_vaccinations").alias("total_vaccinations"))
print("Total Number of Vaccinations Administered Worldwide: ")
print(total_vacc_by_country.groupBy().sum().collect()[0][0])

# total fully vaccinated people 
total_full_vacc_by_country = vaccinationDF_processed.groupBy("country").agg(max("people_fully_vaccinated").alias("total_full_vaccinations"))
fully_vaccinated = total_full_vacc_by_country.groupBy().sum().collect()[0][0]
print("\nNumber of People Fully Vaccinated Worldwide: ")
print(fully_vaccinated)

# world populationn ~7.8 billion 
percent_vaccinated_worldwide = (fully_vaccinated / (7800000000))*100
print("\nPercentage of People Fully Vaccinated Worldwide: ")
print(percent_vaccinated_worldwide)

Total Number of Vaccinations Administered Worldwide: 
160361865.0

Number of People Fully Vaccinated Worldwide: 
19487189.0

Percentage of People Fully Vaccinated Worldwide: 
0.24983575641025643


According to this dataset (which contains nulls for some countries fully vaccinated rows) 0.24%, of the world population has been fully vaccinated by the 09.02.2021

The above numbers are for worldwide progress, we can also look into the progress per country:

In [80]:
# get top 10 countries with highest % of population vaccinated 
max_percent_vacc = vaccinationDF_percent_pop.groupBy("country").agg(max("vaccinated_percentage").alias("curr_vaccination_percentage"))

max_percent_vacc.sort(col("curr_vaccination_percentage").desc()).limit(10).show()

+--------------------+---------------------------+
|             country|curr_vaccination_percentage|
+--------------------+---------------------------+
|              Israel|          25.62534589723106|
|           Gibraltar|         18.225533556947372|
|      Cayman Islands|          7.243295775503386|
|          Seychelles|          4.042538063847724|
|         Isle of Man|          3.004276717736629|
|       United States|          2.969879222631787|
|             Denmark|         2.5430226820203665|
|United Arab Emirates|         2.5224772906414477|
|               Malta|         2.5014148273910584|
|            Slovenia|          2.068697814177949|
+--------------------+---------------------------+



This coincides quite well with the vaccinated per 100 (we have good data!).

### Question 3

In [99]:
# get count of occurences of vaccine groups 
print("\nUnique Vaccine Combinations: ")
vaccination_combos = vaccinationDF_processed.groupBy('vaccines').count().sort(col("count").desc()).show(truncate=False)


Unique Vaccine Combinations: 
+----------------------------------------------------------------------------------+-----+
|vaccines                                                                          |count|
+----------------------------------------------------------------------------------+-----+
|Moderna, Oxford/AstraZeneca, Pfizer/BioNTech                                      |703  |
|Pfizer/BioNTech                                                                   |627  |
|Oxford/AstraZeneca, Pfizer/BioNTech                                               |400  |
|Moderna, Pfizer/BioNTech                                                          |360  |
|Sputnik V                                                                         |82   |
|Sinovac                                                                           |57   |
|Sinopharm/Beijing, Sinopharm/Wuhan, Sinovac                                       |57   |
|Pfizer/BioNTech, Sinovac                                  

1. Moderna, Oxford/AstraZeneca, Pfizer/BioNTech: 703
2. Pfizer/BioNTech: 627
3. Oxford/AstraZeneca, Pfizer/BioNTech: 1139

In [95]:
#create a dummy column to groupBy & in a single line
vaccinationDF_processed = vaccinationDF_processed.withColumn("dummy_col", split("vaccines", ", "))

# create array of vaccine values (without duplicates)
df_grp = vaccinationDF_processed.groupBy("dummy_col").agg(collect_set("dummy_col").alias("array_col"))

# explode the vaccine arrays 
df_grp = df_grp.withColumn("explode_col", explode("array_col"))
df_grp = df_grp.withColumn("vaccine_type", explode("explode_col"))

# get distinct values
distinct_vacc = df_grp.select("vaccine_type").distinct()

#show
distinct_vacc.show(truncate=False)

+------------------+
|vaccine_type      |
+------------------+
|Oxford/AstraZeneca|
|Sinovac           |
|Sinopharm/Wuhan   |
|Sputnik V         |
|Moderna           |
|Pfizer/BioNTech   |
|Covaxin           |
|Sinopharm/Beijing |
+------------------+



In [96]:
#create list of unique vaccine names
vaccine_list = []
for row in distinct_vacc.rdd.collect():
    vaccine_list.append(row.vaccine_type)

#get count of occurences of a vaccine being used
vaccine_dict_all_records = {}
for vaccine in vaccine_list:
    count = vaccinationDF_processed.where(vaccinationDF_processed["vaccines"].contains(vaccine)).count()
    vaccine_dict_all_records[vaccine] = count

print(vaccine_dict_all_records)

{'Oxford/AstraZeneca': 1278, 'Sinovac': 196, 'Sinopharm/Wuhan': 93, 'Sputnik V': 149, 'Moderna': 1063, 'Pfizer/BioNTech': 2263, 'Covaxin': 26, 'Sinopharm/Beijing': 224}


1. Pfizer/BioNTech : 2263 occurences
2. Oxford/AstraZeneca: 1278 occurences
3. Moderna: 1063 occurences

In [105]:
# for each country, get the max vaccinated (= total vaccinated so far) and then aggregate according to vaccine

# create new dataframe with only most recent total value for vaccinations
w = Window.partitionBy("country")
max_vaccs = vaccinationDF_processed.withColumn('curr_total_vacc', max('total_vaccinations').over(w))\
    .where(col('total_vaccinations') == col('curr_total_vacc'))\
    .drop('curr_total_vacc')

# show results
max_vaccs.select("country", "total_vaccinations", "vaccines").show()

#get count of total vaccinations for each combo
max_vaccs.groupBy('vaccines').agg(_sum("total_vaccinations").alias("amount")).sort(col("amount").desc()).show(truncate=False)

+------------+------------------+--------------------+
|     country|total_vaccinations|            vaccines|
+------------+------------------+--------------------+
|      Russia|         1000000.0|           Sputnik V|
|      Sweden|          384725.0|Oxford/AstraZenec...|
|      Jersey|           14838.0|     Pfizer/BioNTech|
|   Singapore|          181000.0|     Pfizer/BioNTech|
|      Turkey|         2732709.0|             Sinovac|
|     Germany|         3369433.0|Moderna, Oxford/A...|
|    Maldives|           16251.0|  Oxford/AstraZeneca|
|Saint Helena|             107.0|  Oxford/AstraZeneca|
|      France|         2216826.0|Moderna, Oxford/A...|
|      Greece|          437872.0|Moderna, Oxford/A...|
|   Sri Lanka|          160148.0|  Oxford/AstraZeneca|
|     Algeria|              30.0|           Sputnik V|
|    Slovakia|          242193.0|     Pfizer/BioNTech|
|   Argentina|          513178.0|           Sputnik V|
|       Wales|          632251.0|Oxford/AstraZenec...|
|     Belg

The problem with this data is that there are mixes of different vaccines in certain countries, but no indicator of how many doses of each vaccine have been administered. It is therefore impossible to determine exactly the number of doses of each vaccine (a mix of 2 vaccines may be 95% one vaccine and 5% the other, but the count is the same).

### Question 4

In [106]:
#get the start date of the data
print(start_date)
print(end_date)

Row(min(date)='2020-12-13')
Row(max(date)='2021-02-09')


In [110]:
current_days = 19+31+9
print("Total Days of Data: {}".format(current_days))
current_percentage = 0.25

Total Days of Data: 59


In [111]:
days_needed = (current_days*70)/(current_percentage)
years_needed = days_needed/365

In [322]:
print("Assuming all of the next days are like the previous 59 (which they won't be) ")
print("Days Needed to get to 70% Worldwide Vaccinations: {}".format(days_needed))
print("Years Needed to get to 70% Worldwide Vaccinations: {}".format(years_needed))

Assuming all of the next days are like the previous 59 (which they won't be) 
Days Needed to get to 70% Worldwide Vaccinations: 16520.0
Years Needed to get to 70% Worldwide Vaccinations: 45.26027397260274


### Attempting to predict with GBT

In [3]:
from pyspark.ml.feature import VectorAssembler,VectorIndexer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [291]:
# create df of total_vaccinations by date 
total_vacc_by_date = vaccinationDF_processed.groupBy("date").agg(_sum("total_vaccinations").alias("vaccinations")).orderBy("date")

total_vacc_by_date = total_vacc_by_date.withColumn("day", row_number().over(Window.orderBy("date")))

total_vacc_by_date.limit(5).show()

+----------+------------+---+
|      date|vaccinations|day|
+----------+------------+---+
|2020-12-13|    172190.0|  1|
|2020-12-14|    172190.0|  2|
|2020-12-15|   1700690.0|  3|
|2020-12-16|   1700690.0|  4|
|2020-12-17|   1700690.0|  5|
+----------+------------+---+



In [292]:
total_vacc_by_date.printSchema()

root
 |-- date: string (nullable = true)
 |-- vaccinations: double (nullable = true)
 |-- day: integer (nullable = true)



In [293]:
# split the data into training and test sets
train, test = total_vacc_by_date.randomSplit([0.7, 0.3])

In [294]:
# have only the day column be the feature
featuresCols = total_vacc_by_date.columns
featuresCols.remove('vaccinations')
featuresCols.remove('date')

In [295]:
# create vector assembler and vector indexer in order to make data correct format for MLlib
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [296]:
# intitalise GBT regressor
gbt = GBTRegressor(labelCol="vaccinations")

In [297]:
# Create param grid of parameters to try 
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()

# Define evaluation metric
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())

# CrossValidator, which will tune the model 
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [298]:
# creat piple with all previously created steps
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [299]:
pipelineModel = pipeline.fit(train)

In [301]:
predictions = pipelineModel.transform(test)

display(predictions.select("vaccinations", "prediction", *featuresCols))

DataFrame[vaccinations: double, prediction: double, day: int]

In [302]:
rmse = evaluator.evaluate(predictions)
print("RMSE on our test set: %g" % rmse)

RMSE on our test set: 2.05367e+06


In [321]:
# get prediction of the number of people vaccinated 1 year from the start of vaccinations
df_one_year_prog = spark.createDataFrame([("2021-12-13", Vectors.dense(365))], ["date", "day"]) 

prediction = pipelineModel.transform(df_one_year_prog)

prediction.show()

+----------+-------+-----------+--------+--------------------+
|      date|    day|rawFeatures|features|          prediction|
+----------+-------+-----------+--------+--------------------+
|2021-12-13|[365.0]|    [365.0]| [365.0]|1.2123853894493017E8|
+----------+-------+-----------+--------+--------------------+



In [317]:
num_people = 121238538

In [319]:
# prediction of % of pop fully vaccinated in a year 
percent_fully_vaccinated = (num_people/7800000000)*100
print("Predicted % of the world fully vaccinated on 13.12.2021: {}%".format(percent_fully_vaccinated))

Predicted % of the world fully vaccinated on 13.12.2021: 1.5543402307692307%
