# Covid 19 data analysis
This notebook contains Python code that perform exploratory data analysis (EDA) using Apache Spark
Data Analytics engine and Pandas

**Requirements**
- Python 3.7 or below.
- Apache Spark
- Pandas

**Other tools**
- scipy
- numpy
- matplotlib
- scikit-learn
- seaborn
- scrapy

In [None]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
import os
from hdfs3 import HDFileSystem
import subprocess

In [None]:
date = '12-28-2020'
covid_data_dir='hdfs://localhost:9000/user/student/csse_covid_19_data/'
covid_data_daily_reports_dir=covid_data_dir + 'csse_covid_19_daily_reports/'
covid_data_daily_file=covid_data_daily_reports_dir + date + '.csv'

In [None]:
spark = SparkSession.builder.appName("covid-stats").config("spark.config.option", "value").getOrCreate()

In [None]:
print(f'Apache Spark version {spark.version}')
print(f'Daily exploratory Data Analysis (EDA) for file {covid_data_daily_file}')

## Reading daily raw data from a csv file

In [None]:
df_raw = spark.read.option("header", "true").csv(covid_data_daily_file)
print(f'Load csv file successfully. There are {df_raw.count()} entries')

In [None]:
df_daily_by_country = df_raw.groupby(['Country Region'])
df_clean = df_raw.select([c for c in df_raw.columns if c in ['Country_Region','Confirmed','Deaths', 'Active', 'Recovered']])

## Processing data by countries or regions

In [None]:
def toInt(item):
    if item:
        return int(item)
    else:
        return 0

In [None]:
df_confirmed = df_clean.rdd.map(lambda x: (x['Country_Region'], toInt(x['Confirmed']))).reduceByKey(lambda x,y: x + y).toDF()
df_deaths    = df_clean.rdd.map(lambda x: (x['Country_Region'], toInt(x['Deaths']))).reduceByKey(lambda x,y: x + y).toDF()
df_recovered = df_clean.rdd.map(lambda x: (x['Country_Region'], toInt(x['Recovered']))).reduceByKey(lambda x,y: x + y).toDF()
df_active    = df_clean.rdd.map(lambda x: (x['Country_Region'], toInt(x['Active']))).reduceByKey(lambda x,y: x + y).toDF()

df_confirmed = df_confirmed.selectExpr("_1 as Country_Region", "_2 as Confirmed")
df_deaths    = df_deaths.selectExpr("_1 as Country_Region", "_2 as Deaths")
df_recovered = df_recovered.selectExpr("_1 as Country_Region", "_2 as Recovered")
df_active    = df_active.selectExpr("_1 as Country_Region", "_2 as Active")


In [None]:
df_new = df_confirmed.join(df_deaths, on=['Country_Region'], how='left_outer')
df_new = df_new.join(df_recovered, on=['Country_Region'], how='left_outer')
df_new = df_new.join(df_active, on=['Country_Region'], how='left_outer')
df_by_country = df_new
df_by_country.describe().show()
# df_by_country.where(df_new.Country_Region == 'Zimbabwe').collect()
df_by_country.where(df_by_country.Country_Region == 'US').collect()


In [None]:
# Most Confirmed:
# df_by_country.select("Confirmed").rdd.max()[0]
for col in df_by_country.columns:
    if col != 'Country_Region':
        rdd = df_by_country.select(col).rdd
        print(rdd.max(), rdd.min())
# df_by_country.where(df_by_country.Confirmed == df_by_country['Confirmed'].max()).collect()

In [None]:
# Writing to file ordered by country

df_output_filename = '/tmp/' + date
subprocess.run(['hdfs', 'dfs', '-rm', '-r', df_output_filename])
df_by_country.orderBy('Country_Region')\
             .coalesce(1)\
             .write.format('csv')\
             .option('header','true')\
             .save(df_output_filename)

In [None]:
df_by_country.orderBy('Country_Region').take(1)

### Convert Spark Dataframe to PANDAS Dataframe

In [None]:
pd_by_country = df_by_country.toPandas()
pd_by_country[(pd_by_country.Confirmed == 4)]

In [None]:
pd_by_country.max()

In [None]:
pd_by_country.min()

In [None]:
pd_by_country.median()

In [None]:
pd_by_country.std()

In [None]:
pd_by_country.var()

In [None]:
pd_by_country.idxmax

In [None]:
for label, content in pd_by_country.items():
    print(f'label: {label}')
    print(f'content: {content}', sep='\n')

In [None]:
pd_by_country[(pd_by_country.Country_Region == 'US')]

In [None]:
pd_by_country.describe()

In [None]:
pd_by_country.to_json('./country_by_day.json')


In [None]:
print('mean:', pd_by_country['Confirmed'].mean())
print('median:', pd_by_country['Confirmed'].median())
print('min:', pd_by_country['Confirmed'].min())
print('stddev:', pd_by_country['Confirmed'].std())
print('kurtosis:', pd_by_country['Confirmed'].kurtosis())
print('skew:', pd_by_country['Confirmed'].skew())
print('idxmax:', pd_by_country['Confirmed'].idxmax())
print('idxmin:', pd_by_country['Confirmed'].idxmin())
print('var:', pd_by_country['Confirmed'].var())
# pd_by_country['Confirmed'].plot()
# pd_by_country['Deaths'].plot()
# pd_by_country['Recovered'].plot()
pd_by_country.plot(figsize=(15,20))

In [None]:
pd_by_country