<a href="https://colab.research.google.com/github/KevinRouille/covid19-pyspark-plotly/blob/main/Data_Visualization_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project: Visualization of Datasets Related to COVID-19 Using PySpark and Plotly
---

# Introduction

### Objective

Build interactive data visualizations to illustrate:
* The spread of COVID-19 cases around the world and the resilience of countries against the pandemic
* The temporal and spatial evolution of mobility in different places since the beginning of the health crisis
* The impact of lockdown measures on hospitalizations and mobility



### Sources

* Google COVID-19 Community Global Mobility Report (227 MB as of Dec. 8, 2020)  
Latest version available at: https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv

* Our World in Data − Complete COVID-19 dataset (14 MB as of Dec. 11, 2020)  
Latest version available at: https://covid.ourworldindata.org/data/owid-covid-data.csv

# Import Spark and Python Libraries

#### Set up Spark

In [None]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark import SparkFiles

#### Import Spark SQL and create `SparkSession`

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import random
spark = SparkSession.builder.appName("YourTest").master("local[2]").config('spark.ui.port', random.randrange(4000,5000)).getOrCreate()

#### Install and import Python libraries

In [None]:
!pip install plotly==4.14.1
!pip install pycountry

In [None]:
import numpy as np
import pandas as pd
import plotly
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pycountry
from urllib.request import urlopen
import json
from datetime import datetime

# Import Data

#### Import dataset from Google on mobility into Spark DataFrame 'Mobility'

In [None]:
mobility_raw_url = "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv"
spark.sparkContext.addFile(mobility_raw_url)
mobility_raw = spark.read.csv(SparkFiles.get("Global_Mobility_Report.csv"),sep=',',inferSchema=True, header=True)
mobility_raw.show()
print(mobility_raw.count())

+-------------------+--------------------+------------+------------+----------+---------------+----------------+-------------------+--------------------------------------------------+-------------------------------------------------+----------------------------------+---------------------------------------------+---------------------------------------+----------------------------------------+
|country_region_code|      country_region|sub_region_1|sub_region_2|metro_area|iso_3166_2_code|census_fips_code|               date|retail_and_recreation_percent_change_from_baseline|grocery_and_pharmacy_percent_change_from_baseline|parks_percent_change_from_baseline|transit_stations_percent_change_from_baseline|workplaces_percent_change_from_baseline|residential_percent_change_from_baseline|
+-------------------+--------------------+------------+------------+----------+---------------+----------------+-------------------+--------------------------------------------------+-------------------------

##### Print DataFrame schema

In [None]:
mobility_raw.printSchema()

root
 |-- country_region_code: string (nullable = true)
 |-- country_region: string (nullable = true)
 |-- sub_region_1: string (nullable = true)
 |-- sub_region_2: string (nullable = true)
 |-- metro_area: string (nullable = true)
 |-- iso_3166_2_code: string (nullable = true)
 |-- census_fips_code: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- retail_and_recreation_percent_change_from_baseline: integer (nullable = true)
 |-- grocery_and_pharmacy_percent_change_from_baseline: integer (nullable = true)
 |-- parks_percent_change_from_baseline: integer (nullable = true)
 |-- transit_stations_percent_change_from_baseline: integer (nullable = true)
 |-- workplaces_percent_change_from_baseline: integer (nullable = true)
 |-- residential_percent_change_from_baseline: integer (nullable = true)



#### Import dataset from Our World in Data on the spread of COVID-19 into Spark DataFrame 'Cases'

In [None]:
cases_raw_url = "https://covid.ourworldindata.org/data/owid-covid-data.csv"
spark.sparkContext.addFile(cases_raw_url)
cases_raw = spark.read.csv(SparkFiles.get("owid-covid-data.csv"),sep=',',inferSchema=True, header=True)
cases_raw.show()
print(cases_raw.count())

+--------+---------+-----------+-------------------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------

##### Print DataFrame schema

In [None]:
cases_raw.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_mill

# Transform and Join Mobility and Cases DataFrames

### Prepare Mobility DataFrame to join Cases

#### Replace two-letter ISO country codes with three-letter codes

In [None]:
mobility = mobility_raw.withColumnRenamed('date', 'date')
for country in list(pycountry.countries):
  mobility = mobility.withColumn('country_region_code', regexp_replace('country_region_code', str('^' + country.alpha_2 + '$'), country.alpha_3))
mobility.show()

+-------------------+--------------------+------------+------------+----------+---------------+----------------+-------------------+--------------------------------------------------+-------------------------------------------------+----------------------------------+---------------------------------------------+---------------------------------------+----------------------------------------+
|country_region_code|      country_region|sub_region_1|sub_region_2|metro_area|iso_3166_2_code|census_fips_code|               date|retail_and_recreation_percent_change_from_baseline|grocery_and_pharmacy_percent_change_from_baseline|parks_percent_change_from_baseline|transit_stations_percent_change_from_baseline|workplaces_percent_change_from_baseline|residential_percent_change_from_baseline|
+-------------------+--------------------+------------+------------+----------+---------------+----------------+-------------------+--------------------------------------------------+-------------------------

In [None]:
mobility.coalesce(1).write.csv("mobility", mode = "overwrite", header = "true") # takes around 10 minutes to complete

In [None]:
mobility = spark.read.csv("mobility",sep=',',inferSchema=True, header=True)

#### Remove rows on sub-regional data and drop columns about sub-regional levels

In [None]:
mobility_by_country = mobility.filter(mobility.sub_region_1.isNull() & mobility.sub_region_2.isNull() & mobility.metro_area.isNull()).drop("sub_region_1", "sub_region_2", "metro_area", "iso_3166_2_code", "census_fips_code")
mobility_by_country.show()

+-------------------+--------------------+-------------------+--------------------------------------------------+-------------------------------------------------+----------------------------------+---------------------------------------------+---------------------------------------+----------------------------------------+
|country_region_code|      country_region|               date|retail_and_recreation_percent_change_from_baseline|grocery_and_pharmacy_percent_change_from_baseline|parks_percent_change_from_baseline|transit_stations_percent_change_from_baseline|workplaces_percent_change_from_baseline|residential_percent_change_from_baseline|
+-------------------+--------------------+-------------------+--------------------------------------------------+-------------------------------------------------+----------------------------------+---------------------------------------------+---------------------------------------+----------------------------------------+
|                ARE|U

#### Create id column and rename duplicate date column

In [None]:
mobility_with_id = mobility_by_country.withColumn("id", concat(col("country_region_code"), lit(" "), col("date"))).toDF("country_region_code","country_region","date_bis", "retail_and_recreation_percent_change", "grocery_and_pharmacy_percent_change", "parks_percent_change", "transit_stations_percent_change", "workplaces_percent_change", "residential_percent_change", "id")
mobility_with_id.show()

+-------------------+--------------------+-------------------+------------------------------------+-----------------------------------+--------------------+-------------------------------+-------------------------+--------------------------+--------------------+
|country_region_code|      country_region|           date_bis|retail_and_recreation_percent_change|grocery_and_pharmacy_percent_change|parks_percent_change|transit_stations_percent_change|workplaces_percent_change|residential_percent_change|                  id|
+-------------------+--------------------+-------------------+------------------------------------+-----------------------------------+--------------------+-------------------------------+-------------------------+--------------------------+--------------------+
|                ARE|United Arab Emirates|2020-02-15 00:00:00|                                   0|                                  4|                   5|                              0|                       

### Prepare Cases DataFrame to join Mobility

#### Replace null values with zeros

In [None]:
cases = cases_raw.fillna(0)
cases.show()

+--------+---------+-----------+-------------------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------

#### Create id column

In [None]:
cases_with_id = cases.withColumn("id", concat(col("iso_code"), lit(" "), col("date")))
cases_with_id.show()

+--------+---------+-----------+-------------------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------

### Join Cases and Mobility DataFrames on id column

In [None]:
cases_mobility = cases_with_id.join(mobility_with_id, on = ["id"], how = "inner").drop("country_region_code", "country_region", "date_bis")
cases_mobility = cases_mobility.withColumnRenamed("iso_code", "country_code")
cases_mobility = cases_mobility.withColumnRenamed("location", "country").orderBy("id", ascending = True)
cases_mobility.show()

+--------------------+------------+---------+-----------+-------------------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-----------

In [None]:
cases_mobility.coalesce(1).write.csv("cases_mobility", mode = "overwrite", header = "true")

In [None]:
cases_mobility = spark.read.csv("cases_mobility",sep=',',inferSchema=True, header=True)

# Build Interactive Data Visualizations

### Visualize data from Cases DataFrame

#### Visualization 1: Map of the evolution of new cases per million people and cumulative deaths per million people

##### Get relevant data from Spark DataFrame

In [None]:
cases_world_query = cases.filter("date between '2020-03-01' and '2020-12-01'")
cases_world_query_pd = cases_world_query.toPandas()

##### Build interactive visualization

In [None]:
df = cases_world_query_pd

fig = px.choropleth(df, locations="iso_code", range_color=[0,500],
                    labels={"new_cases_smoothed_per_million":"7-day rolling average of new cases per million people", "animation_frame":"date"},
                    color="new_cases_smoothed_per_million", hover_name="location", projection="natural earth",
                    color_continuous_scale="reds", animation_frame=df.date.astype(str))

fig2 = px.scatter_geo(df, locations="iso_code", size="total_deaths_per_million", hover_data=["new_cases_smoothed_per_million"], labels={"new_cases_smoothed_per_million":"7-day rolling average of new cases per million people"},
                    hover_name="location", animation_frame=df.date.astype(str))

fig.add_trace(fig2.data[0])

for i, frame in enumerate(fig.frames):
    fig.frames[i].data += (fig2.frames[i].data[0],)

fig.update_layout(margin={"r":0,"t":30,"l":0,"b":0}, title_text="Evolution of 7-Day Rolling Average of New Cases per million people and Cumulative Deaths per million people", height=800)

fig.show()

#### Visualization 2: Bubble chart of the resilience of countries in face of the COVID-19 pandemic

##### Get relevant data from Spark DataFrame

In [None]:
cases_resilience_query = cases.filter("date == '2020-12-01'")
cases_resilience_query_pd = cases_resilience_query.toPandas()

##### Build interactive visualization

In [None]:
df = cases_resilience_query_pd.dropna()

fig = px.scatter(df, x='hospital_beds_per_thousand', y='total_deaths_per_million', range_color=[0,100], text="iso_code", color='stringency_index', size='gdp_per_capita', color_continuous_scale="reds",
                facet_col='continent', facet_col_wrap=3, hover_name="location", category_orders={"continent": ["Africa", "North America", "South America", "Asia", "Europe", "Oceania"]},
                labels={"hospital_beds_per_thousand":"hospital beds per thousand people", "total_deaths_per_million":"cumulative deaths per million people", "stringency_index":"government response stringency index", "gdp_per_capita":"GDP per capita in USD"})

fig.update_traces(textposition='top center')
fig.update_layout(margin={"r":0,"t":40,"l":0,"b":0}, title_text="Resilience of Countries in Face of the COVID-19 Pandemic as of Dec. 1, 2020", height=700)
fig.for_each_annotation(lambda a: a.update(text=a.text.split("=")[-1]))

fig.show()

### Visualize data from Mobility DataFrame

#### Visualization 1: Line graph of average monthly change in mobility in worst-hit and least-affected countries

##### Get relevant data from Spark DataFrame: add a column for the month and get average mobility change over each month

In [None]:
mobility_by_month = mobility_by_country.withColumn('month', mobility.date[6:2]).drop("date")
mobility_worst_least_query = mobility_by_month.where('country_region_code in ("ARG","ZAF","SWE","USA","KOR","ITA","TWN","DEU","BEL","JPN","NZL","URY")').groupBy('country_region', 'month').agg(avg('retail_and_recreation_percent_change_from_baseline'),avg('grocery_and_pharmacy_percent_change_from_baseline'),avg('parks_percent_change_from_baseline'),avg('transit_stations_percent_change_from_baseline'),avg('workplaces_percent_change_from_baseline'),avg('residential_percent_change_from_baseline')).toDF("country_region","month","Retail & Recreation", "Grocery & Pharmacy", "Parks", "Transit Stations", "Workplaces", "Residential").orderBy("month")
mobility_worst_least_query_pd = mobility_worst_least_query.toPandas()

##### Build interactive visualization

In [None]:
df = mobility_worst_least_query_pd

fig = px.line(df, x="month", y=["Retail & Recreation", "Grocery & Pharmacy", "Parks", "Transit Stations", "Workplaces", "Residential"], facet_col="country_region", facet_col_wrap = 4, title='Mobility Percent Change in Worst-Hit and Least-Affected Countries', height=800,
              category_orders={"country_region": ["Belgium", "Italy", "Germany", "Sweden", "United States","Argentina","Uruguay","South Africa","Japan","South Korea","Taiwan","New Zealand"]}, hover_name="value",
              labels={"variable":"Place Category", "value":"avg. mobility % change"})

fig.update_layout(margin={"r":0,"t":40,"l":0,"b":0}, title_text="Average Monthly Mobility Percent Change per Place Category in Worst-Hit and Least-Affected Countries", height=700)
fig.for_each_annotation(lambda a: a.update(text=a.text.split("=")[-1]))

fig.show()

#### Visualization 2: Map of average change in mobility during the peak of the first wave of COVID-19 in the United States

##### Get relevant data from Spark DataFrame: add column for the code of the U.S. state and get average mobility change in April 2020

In [None]:
mobility_us = mobility.withColumn('sub_region_1_code', mobility['sub_region_1']).filter("country_region_code == 'USA'").filter(mobility.sub_region_1.isNotNull() & mobility.sub_region_2.isNull())

for state in list(pycountry.subdivisions.get(country_code='US')):
  mobility_us = mobility_us.withColumn('sub_region_1_code', regexp_replace('sub_region_1_code', str('^' + state.name + '$'), state.code[-2:]))

mobility_us_query = mobility_us.where("date between '2020-04-01' and '2020-05-01'").groupBy('sub_region_1_code').agg(avg('retail_and_recreation_percent_change_from_baseline'),avg('grocery_and_pharmacy_percent_change_from_baseline'),avg('parks_percent_change_from_baseline'),avg('transit_stations_percent_change_from_baseline'),avg('workplaces_percent_change_from_baseline'),avg('residential_percent_change_from_baseline')).toDF("sub_region_1_code","avgRR", "avgGP", "avgP", "avgT", "avgW", "avgR")
mobility_us_query_pd = mobility_us_query.toPandas()

##### Build interactive visualization

In [None]:
df = mobility_us_query_pd
df = df.rename(columns=dict(avgRR="Retail & Recreation", avgGP="Grocery & Pharmacy", avgP="Parks",
                            avgT="Transit Stations", avgW="Workplaces", avgR="Residential"))

mobility_categories = ["Retail & Recreation","Grocery & Pharmacy","Parks","Transit Stations","Workplaces","Residential"]
rows = 2
cols = 3

fig = make_subplots(
    rows=rows, cols=cols, shared_xaxes=True,
    subplot_titles=mobility_categories,
    specs = [[{'type': 'choropleth'} for c in np.arange(cols)] for r in np.arange(rows)]
    )

for i, category in enumerate(mobility_categories):
    fig.add_trace(go.Choropleth(
       locationmode='USA-states', locations=df['sub_region_1_code'],
        z=df[category],
        colorbar_title="Percent change from baseline",
    ), row = i//cols+1, col = i%cols+1)

fig.update_layout(**{'geo' + str(i) + '_scope': 'usa' for i in [''] + np.arange(2,rows*cols+1).tolist()},
                  title_text="Average Mobility Percent Change per Place Category in the United States in April 2020")

for index, trace in enumerate(fig.data):
    fig.data[index].hovertemplate = 'State: %{location}<br>%{z} percent change'

fig.update_traces(zmin=-100, zmax=100)

fig.show()

### Visualize data from joint Cases & Mobility DataFrame

#### Visualization: Bar and line graph illustrating the impact of lockdowns on hospitalizations and mobility in France

##### Get relevant data from Spark DataFrame

In [None]:
cases_mobility_france_query = cases_mobility.filter('country_code == "FRA"')
cases_mobility_france_query_pd = cases_mobility_france_query.filter("date between '2020-02-24' and '2020-12-06'").toPandas()

##### Build interactive visualization

In [None]:
df = cases_mobility_france_query_pd

# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])

# Add traces
fig.add_trace(go.Bar(x=df["date"], y=df["hosp_patients_per_million"], name="Hospitalized patients per million people"), secondary_y=False)
fig.add_trace(go.Bar(x=df["date"], y=df["icu_patients_per_million"], name="Patients in ICU per million people"), secondary_y=False)

fig.add_trace(go.Scatter(x=df["date"], y=df["retail_and_recreation_percent_change"], name="Retail & Recreation mobility change"), secondary_y=True)
fig.add_trace(go.Scatter(x=df["date"], y=df["grocery_and_pharmacy_percent_change"], name="Grocery & Pharmacy mobility change"), secondary_y=True)
fig.add_trace(go.Scatter(x=df["date"], y=df["parks_percent_change"], name="Parks mobility change"), secondary_y=True)
fig.add_trace(go.Scatter(x=df["date"], y=df["transit_stations_percent_change"], name="Transit Stations mobility change"), secondary_y=True)
fig.add_trace(go.Scatter(x=df["date"], y=df["workplaces_percent_change"], name="Workplaces mobility change"), secondary_y=True)
fig.add_trace(go.Scatter(x=df["date"], y=df["residential_percent_change"], name="Residential mobility change"), secondary_y=True)

# Add figure title
fig.update_layout(
    title_text="Impact of Lockdown Measures on Hospitalizations and Mobility in France", height=600, margin={"r":0,"t":120,"l":0,"b":0},
    xaxis=dict(
        rangeselector=dict(buttons=list([dict(count=7, label="1 week", step="day", stepmode="backward"),
                dict(count=1, label="1 month", step="month", stepmode="backward"),
                dict(count=3, label="3 months", step="month", stepmode="backward"),
                dict(step="all")])),
        rangeslider=dict(visible=True, thickness=0.1),
        type="date"
    ),
    legend=dict(
        orientation="h",
        yanchor="bottom",
        y=1.02,
        xanchor="right",
        x=1
    )
)

# Add annotations
fig.add_annotation(x="2020-03-17", y=0, text="Start of<br>1st lockdown", showarrow=True,ax=0, ay=-310, hovertext="2020-03-17")
fig.add_annotation(x="2020-04-14", y=492, text="Peak of<br>1st wave", showarrow=True,ax=0, ay=-30, hovertext="2020-04-14")
fig.add_annotation(x="2020-05-11", y=0, text="End of<br>1st lockdown", showarrow=True, ax=0, ay=-310, hovertext="2020-05-11")
fig.add_annotation(x="2020-10-17", y=0, text="Overnight<br>curfew", showarrow=True,ax=0, ay=-210, hovertext="2020-10-17")
fig.add_annotation(x="2020-10-30", y=0, text="Start of<br>2nd lockdown", showarrow=True,ax=0, ay=-310, hovertext="2020-10-30")
fig.add_annotation(x="2020-11-16", y=513, text="Peak of<br>2nd wave", showarrow=True,ax=0, ay=-30, hovertext="2020-11-16")
fig.add_annotation(x="2020-12-01", y=0, text="2nd lockdown<br>partially lifted", showarrow=True, ax=0, ay=-310, hovertext="2020-12-01")

# Set x-axis title
fig.update_xaxes(title_text="date")

# Set y-axes titles
fig.update_yaxes(title_text="patients per million people", secondary_y=False)
fig.update_yaxes(title_text="mobility % change", secondary_y=True)

fig.show()