# 1. Data Loading and Overview

In [1]:
from pyspark.sql import SparkSession, SQLContext

spark = SparkSession.builder\
                    .appName('Covid19 Data Mining')\
                    .config('spark.sql.debug.maxToStringFields', 2000)\
                    .getOrCreate()

df = spark.read.csv('owid-covid-data.csv', header=True, inferSchema=True)

22/08/23 14:36:04 WARN Utils: Your hostname, kevin-H resolves to a loopback address: 127.0.1.1; using 192.168.1.7 instead (on interface wlo1)
22/08/23 14:36:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/08/23 14:36:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [2]:
print(f"Length: {df.count()} \n Columns: {len(df.columns)} ")

[Stage 2:>                                                          (0 + 4) / 4]

Length: 209763 
 Columns: 67 


                                                                                

Let us check the schema of the dataset

In [3]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million

## Check missing values

In [4]:
from pyspark.sql import functions as func
missing_data = df.select([func.count(func.when(func.isnull(c), c)).alias(c) for c in df.columns])\
                                .collect()[0].asDict() 
missing_data = dict(sorted(missing_data.items(), reverse=True, key=lambda item: item[1]))
import pandas as pd
pd.DataFrame.from_records([missing_data])

                                                                                

Unnamed: 0,weekly_icu_admissions,weekly_icu_admissions_per_million,excess_mortality_cumulative_absolute,excess_mortality_cumulative,excess_mortality_cumulative_per_million,excess_mortality,weekly_hosp_admissions,weekly_hosp_admissions_per_million,icu_patients,icu_patients_per_million,...,new_cases_smoothed_per_million,new_cases_smoothed,new_cases_per_million,total_cases_per_million,new_cases,total_cases,population,iso_code,location,date
0,202886,202886,202630,202630,202630,202610,196125,196125,182078,182078,...,10883,9964,9711,9468,8787,8544,1241,0,0,0


# Data Preprocessing

In [5]:
df = df.withColumn('date', func.to_date(func.unix_timestamp(func.col('date'), 'yyy-MM-dd')\
                                        .cast('timestamp')))

df.select('date').show(5)
        

+----------+
|      date|
+----------+
|2020-02-24|
|2020-02-25|
|2020-02-26|
|2020-02-27|
|2020-02-28|
+----------+
only showing top 5 rows



## filter the data

In [6]:
dates = ('2021-01-01', '2021-02-28')

df = df.where(func.col('date').between(*dates))

## Handling missing values

In [7]:
df.sort('continent').select('iso_code', 'continent', 'location').show(5)

[Stage 7:>                                                          (0 + 4) / 4]

+--------+---------+-------------+
|iso_code|continent|     location|
+--------+---------+-------------+
|OWID_SAM|     null|South America|
|OWID_SAM|     null|South America|
|OWID_SAM|     null|South America|
|OWID_SAM|     null|South America|
|OWID_SAM|     null|South America|
+--------+---------+-------------+
only showing top 5 rows



                                                                                

In [8]:
df.fillna({'continent': 'OWID'})

DataFrame[iso_code: string, continent: string, location: string, date: date, total_cases: double, new_cases: double, new_cases_smoothed: double, total_deaths: double, new_deaths: double, new_deaths_smoothed: double, total_cases_per_million: double, new_cases_per_million: double, new_cases_smoothed_per_million: double, total_deaths_per_million: double, new_deaths_per_million: double, new_deaths_smoothed_per_million: double, reproduction_rate: double, icu_patients: double, icu_patients_per_million: double, hosp_patients: double, hosp_patients_per_million: double, weekly_icu_admissions: double, weekly_icu_admissions_per_million: double, weekly_hosp_admissions: double, weekly_hosp_admissions_per_million: double, total_tests: double, new_tests: double, total_tests_per_thousand: double, new_tests_per_thousand: double, new_tests_smoothed: double, new_tests_smoothed_per_thousand: double, positive_rate: double, tests_per_case: double, tests_units: string, total_vaccinations: double, people_vacc

In [9]:
df.select('tests_units').distinct().show()

                                                                                

+---------------+
|    tests_units|
+---------------+
|           null|
|  people tested|
| samples tested|
|tests performed|
|  units unclear|
+---------------+



                                                                                

tests_units is a variable that idnicates how each country/location reports on the performed tests. For example, in this case, people tested, the reported number of total tests is expected to be lower compared to the same report in the case of tests_performed, since one person can be testedd more than once. This implies that the missing values are due to some countries/locations not providing the relevant information on how they count the total number of daily tests. 

In [10]:
df = df.fillna({'tests_units': 'no info'})

In [11]:
df = df.fillna(0)

## Outlier Detection

In [12]:
def OutlierDetection(dataframe, features, alpha=1.5):
    feat_types = dict(dataframe.dtypes)
    if features == 'all':
        features = dataframe.columns
    
    outliers_cols = []
    for feat in features:
        # quantitative features
        if feat_types[feat] == 'double':
            Q1, Q3 = dataframe.approxQuantile(feat, [0.25, 0.75], 0)
            iqr = Q3 - Q1
            lower_bound = Q1 - (iqr * alpha)
            upper_bound = Q3 + (iqr * alpha)
            outliers_cols.append(func.when(~func.col(feat).between(lower_bound, upper_bound), True)\
                                    .alias(feat + '_outlier'))
    outlier_df = dataframe.select(*outliers_cols)
    outlier_df = outlier_df.fillna(False)
    return outlier_df

In [13]:
out_df = OutlierDetection(dataframe=df, features = ['new_cases'], alpha = 1.5)
out_df.show()

                                                                                

+-----------------+
|new_cases_outlier|
+-----------------+
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
+-----------------+
only showing top 20 rows



## Duplicate Entries


In [15]:
df.dropDuplicates(['location', 'date'])

DataFrame[iso_code: string, continent: string, location: string, date: date, total_cases: double, new_cases: double, new_cases_smoothed: double, total_deaths: double, new_deaths: double, new_deaths_smoothed: double, total_cases_per_million: double, new_cases_per_million: double, new_cases_smoothed_per_million: double, total_deaths_per_million: double, new_deaths_per_million: double, new_deaths_smoothed_per_million: double, reproduction_rate: double, icu_patients: double, icu_patients_per_million: double, hosp_patients: double, hosp_patients_per_million: double, weekly_icu_admissions: double, weekly_icu_admissions_per_million: double, weekly_hosp_admissions: double, weekly_hosp_admissions_per_million: double, total_tests: double, new_tests: double, total_tests_per_thousand: double, new_tests_per_thousand: double, new_tests_smoothed: double, new_tests_smoothed_per_thousand: double, positive_rate: double, tests_per_case: double, tests_units: string, total_vaccinations: double, people_vacc

# Exploratory Data Analysis

In [17]:
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from matplotlib.colors import ListedColormap, LinearSegmentedColormap, TwoSlopeNorm
from mpl_toolkits.axes_grid1 import make_axes_locatable


def CustomMap(from_rgb, to_rgb):
    # from rgb
    r1, g1, b1 = from_rgb
    # to rgb
    r2, g2, b2 = to_rgb

    cdict = {
        'red': ((0,r1,r1),
                (1, r2, r2)),
        'green': ((0, g1, g1),
                    1, g2, g2),
        'blue': ((0, b1, b1),
                (1, b2, b2))
    }
    cmap = LinearSegmentedColormap('custom_cmap', cdict)
    return cmap


myCmap = CustomMap([1.0, 1.0, 1.0], [72/255, 99/255, 147/255])
myCmap_r = CustomMap([72/255, 99/255, 147/255], [1.0, 1.0, 1.0])

mycol = (72/255, 99/255, 147/255)
mycomplcol = (129/255, 143/255, 163/255)
othercol1 = (135/255, 121/255, 215/255)
othercol2 = (57/255, 119/255, 171/255)
othercol3 = (68/255, 81/255, 91/255)
othercol4 = (73/255, 149/255, 139/255)
 