In [1]:
import numpy as np 
import pandas as pd 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from sklearn import preprocessing
from datetime import datetime

In [2]:
!ls
!java -version

CA_population.CSV                countryLockdowndatesJHUMatch.csv
COVID-19.ipynb                   covid19.csv
airports.csv                     hospital.csv
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.252-b09, mixed mode)


In [3]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession.builder \
    .master("local") \
    .appName("covid") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
CA_covid_confirmed_case = spark.read.option("header","true").csv("covid19.csv")
CA_covid_confirmed_case = CA_covid_confirmed_case.filter(CA_covid_confirmed_case["prname"] != "Repatriated travellers")
CA_covid_confirmed_case = CA_covid_confirmed_case.filter(CA_covid_confirmed_case["prname"] != "Canada")
x = CA_covid_confirmed_case.columns
columns_to_drop = []
for c in x:
    if c != "prname" and c != "date" and c != "numconf":
        columns_to_drop.append(c)
CA_covid_confirmed_case = CA_covid_confirmed_case.drop(*columns_to_drop)
CA_covid_confirmed_case.show()

+----------------+----------+-------+
|          prname|      date|numconf|
+----------------+----------+-------+
|         Ontario|31-01-2020|      3|
|British Columbia|31-01-2020|      1|
|         Ontario|08-02-2020|      3|
|British Columbia|08-02-2020|      4|
|         Ontario|16-02-2020|      3|
|British Columbia|16-02-2020|      5|
|         Ontario|21-02-2020|      3|
|British Columbia|21-02-2020|      6|
|         Ontario|24-02-2020|      4|
|British Columbia|24-02-2020|      6|
|         Ontario|25-02-2020|      4|
|British Columbia|25-02-2020|      7|
|         Ontario|26-02-2020|      5|
|British Columbia|26-02-2020|      7|
|         Ontario|27-02-2020|      6|
|British Columbia|27-02-2020|      7|
|         Ontario|29-02-2020|      8|
|British Columbia|29-02-2020|      7|
|         Ontario|01-03-2020|     15|
|British Columbia|01-03-2020|      8|
+----------------+----------+-------+
only showing top 20 rows



In [5]:
CA_population = spark.read.option("header","true").csv("CA_population.CSV")
CA_population = CA_population.withColumn("Population density per square kilometre, 2016", col("Population density per square kilometre, 2016").cast("double"))
columns_to_drop = ['Population, 2011','2011 adjusted population flag','Incompletely enumerated Indian reserves and Indian settlements, 2011','Population, % change','Incompletely enumerated Indian reserves and Indian settlements, 2016','Total private dwellings, 2016','Private dwellings occupied by usual residents, 2016','Land area in square kilometres, 2016','Geographic code']
CA_population = CA_population.drop(*columns_to_drop)
CA_population = CA_population.filter(CA_population["Population, 2016"] != "null")
CA_population = CA_population.filter(CA_population["Geographic name"] != "Canada")
CA_population = CA_population.withColumnRenamed('Geographic name','Geographic_name')
CA_population.show()

+--------------------+----------------+---------------------------------------------+
|     Geographic_name|Population, 2016|Population density per square kilometre, 2016|
+--------------------+----------------+---------------------------------------------+
|Newfoundland and ...|          519716|                                          1.4|
|Prince Edward Island|          142907|                                         25.1|
|         Nova Scotia|          923598|                                         17.4|
|       New Brunswick|          747101|                                         10.5|
|              Quebec|         8164361|                                          6.0|
|             Ontario|        13448494|                                         14.8|
|            Manitoba|         1278365|                                          2.3|
|        Saskatchewan|         1098352|                                          1.9|
|             Alberta|         4067175|               

In [6]:
CA_hospitalbeds = spark.read.option("header","true").csv("hospital.csv")
CA_hospitalbeds = CA_hospitalbeds.withColumn("Hospitals with ICUs ", col("Hospitals with ICUs ").cast("double"))
CA_hospitalbeds = CA_hospitalbeds.withColumn("ICU beds with ventilation capacity ", col("Hospitals with ICUs ").cast("double"))
CA_hospitalbeds = CA_hospitalbeds.withColumn("Invasive ventilators", col("Hospitals with ICUs ").cast("double"))
province_map = {"Nfld." : "Newfoundland and Labrador", "N.S." : "Nova Scotia", "PEI" : "Prince Edward Island", "N.B." : "New Brunswick", \
               "Que." : "Quebec", "Ont." : "Ontario", "Sask." : "Saskatchewan", "Man." : "Manitoba", "Alta." : "Alberta", \
                "B.C." : "British Columbia", "Territories" : "Northwest Territories"}
CA_hospitalbeds = CA_hospitalbeds.filter(CA_hospitalbeds["Region"] != "Canada")
CA_hospitalbeds = CA_hospitalbeds.filter(CA_hospitalbeds["Hospitals with ICUs "] != 2.7)
for k in province_map:
    CA_hospitalbeds = CA_hospitalbeds.withColumn("Region",when(CA_hospitalbeds["Region"] == k, province_map[k]).otherwise(CA_hospitalbeds["Region"]))
newRow = spark.createDataFrame([("Yukon",2.7/3,5.5/3,13.7/3)])
CA_hospitalbeds = CA_hospitalbeds.union(newRow)
newRow = spark.createDataFrame([("Nunavut",2.7/3,5.5/3,13.7/3)])
CA_hospitalbeds = CA_hospitalbeds.union(newRow)
newRow = spark.createDataFrame([("Northwest Territories",2.7/3,5.5/3,13.7/3)])
CA_hospitalbeds = CA_hospitalbeds.union(newRow)
CA_hospitalbeds.show()


+--------------------+--------------------+-----------------------------------+--------------------+
|              Region|Hospitals with ICUs |ICU beds with ventilation capacity |Invasive ventilators|
+--------------------+--------------------+-----------------------------------+--------------------+
|Newfoundland and ...|                 2.8|                                2.8|                 2.8|
|         Nova Scotia|                 1.5|                                1.5|                 1.5|
|Prince Edward Island|                 1.4|                                1.4|                 1.4|
|       New Brunswick|                 1.2|                                1.2|                 1.2|
|              Quebec|                 1.1|                                1.1|                 1.1|
|             Ontario|                 0.6|                                0.6|                 0.6|
|        Saskatchewan|                 1.3|                                1.3|            

In [7]:
global_airports = spark.read.option("header","true").csv("airports.csv")
x = global_airports.columns
columns_to_drop = [x[0],x[1],x[4],x[5],x[6],x[7],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17]]
global_airports = global_airports.drop(*columns_to_drop)
global_airports = global_airports.filter(global_airports["iso_country"] == "CA")
global_airports = global_airports.filter(global_airports["type"] == "large_airport")
global_airports = global_airports.drop(*["iso_country", "type"])
province_map = {"CA-NL" : "Newfoundland and Labrador", "CA-NS" : "Nova Scotia",\
               "CA-QC" : "Quebec", "CA-ON" : "Ontario", "CA-MB" : "Manitoba", "CA-AB" : "Alberta", \
                "CA-BC" : "British Columbia", "Territories" : "Northwest Territories"}
for k in province_map:
    global_airports = global_airports.withColumn("iso_region",when(global_airports["iso_region"] == k, province_map[k]).otherwise(global_airports["iso_region"]))
global_airports = global_airports.withColumn("Airport Count",when(global_airports["iso_region"] == "Ontario",float(2)).otherwise(float(1)))
global_airports = global_airports.withColumn("Airport Count",when(global_airports["iso_region"] == "British Columbia",float(2)).otherwise(float(1)))
global_airports = global_airports.select("iso_region","Airport Count")
newRow = spark.createDataFrame([("Yukon",float(0.0))])
global_airports = global_airports.union(newRow)
newRow = spark.createDataFrame([("Nunavut",float(0.0))])
global_airports = global_airports.union(newRow)
newRow = spark.createDataFrame([("Northwest Territories",float(0.0))])
global_airports = global_airports.union(newRow)
global_airports = global_airports.filter(global_airports["iso_region"] != "Ontario")
global_airports = global_airports.filter(global_airports["iso_region"] != "British Columbia")
global_airports = global_airports.filter(global_airports["iso_region"] != "Alberta")
newRow = spark.createDataFrame([("Ontario",float(2))])
global_airports = global_airports.union(newRow)
newRow = spark.createDataFrame([("British Columbia",float(2))])
global_airports = global_airports.union(newRow)
newRow = spark.createDataFrame([("Alberta",float(2))])
global_airports = global_airports.union(newRow)
newRow = spark.createDataFrame([("Saskatchewan",float(0))])
global_airports = global_airports.union(newRow)
newRow = spark.createDataFrame([("New Brunswick",float(0))])
global_airports = global_airports.union(newRow)
newRow = spark.createDataFrame([("Prince Edward Island",float(0))])
global_airports = global_airports.union(newRow)
global_airports.show()

+--------------------+-------------+
|          iso_region|Airport Count|
+--------------------+-------------+
|         Nova Scotia|          1.0|
|              Quebec|          1.0|
|            Manitoba|          1.0|
|Newfoundland and ...|          1.0|
|               Yukon|          0.0|
|             Nunavut|          0.0|
|Northwest Territo...|          0.0|
|             Ontario|          2.0|
|    British Columbia|          2.0|
|             Alberta|          2.0|
|        Saskatchewan|          0.0|
|       New Brunswick|          0.0|
|Prince Edward Island|          0.0|
+--------------------+-------------+



In [8]:
CA_covid_confirmed_case.registerTempTable("t1")
CA_population.registerTempTable("t2")
CA_hospitalbeds.registerTempTable("t3")
global_airports.registerTempTable("t4")

In [9]:
sqlContext = SQLContext(sc)
df = sqlContext.sql('select * from t1 full outer join t2 on t1.prname=t2.Geographic_name full outer join t3 on t2.Geographic_name=t3.Region full outer join t4 on t3.Region=t4.iso_region')
df=df.drop(*['name','iso_region','Geographic_name','Region'])
df = df.withColumn("numconf", col("numconf").cast("double"))
df = df.withColumn("Population, 2016", col("Population, 2016").cast("double"))
df = df.withColumn("Population density per square kilometre, 2016", col("Population density per square kilometre, 2016").cast("double"))
df = df.withColumn("Hospitals with ICUs ", col("Hospitals with ICUs ").cast("double"))
df = df.withColumn("ICU beds with ventilation capacity ", col("ICU beds with ventilation capacity ").cast("double"))
df = df.withColumn("Invasive ventilators", col("Invasive ventilators").cast("double"))
df.show()

+--------+----------+-------+----------------+---------------------------------------------+--------------------+-----------------------------------+--------------------+-------------+
|  prname|      date|numconf|Population, 2016|Population density per square kilometre, 2016|Hospitals with ICUs |ICU beds with ventilation capacity |Invasive ventilators|Airport Count|
+--------+----------+-------+----------------+---------------------------------------------+--------------------+-----------------------------------+--------------------+-------------+
|Manitoba|11-03-2020|    0.0|       1278365.0|                                          2.3|                 1.0|                                1.0|                 1.0|          1.0|
|Manitoba|12-03-2020|    0.0|       1278365.0|                                          2.3|                 1.0|                                1.0|                 1.0|          1.0|
|Manitoba|13-03-2020|    1.0|       1278365.0|                             

In [10]:
init_date = pd.to_datetime('31-01-2020',format='%d-%m-%Y').toordinal()
pandas_df = df.toPandas()
pandas_df['date'] = pd.to_datetime(pandas_df['date'],format='%d-%m-%Y')
pandas_df['date'] = pandas_df['date'].apply(lambda x: x.toordinal()-init_date)
pandas_df.head()

Unnamed: 0,prname,date,numconf,"Population, 2016","Population density per square kilometre, 2016",Hospitals with ICUs,ICU beds with ventilation capacity,Invasive ventilators,Airport Count
0,Manitoba,40,0.0,1278365.0,2.3,1.0,1.0,1.0,1.0
1,Manitoba,41,0.0,1278365.0,2.3,1.0,1.0,1.0,1.0
2,Manitoba,42,1.0,1278365.0,2.3,1.0,1.0,1.0,1.0
3,Manitoba,43,1.0,1278365.0,2.3,1.0,1.0,1.0,1.0
4,Manitoba,44,4.0,1278365.0,2.3,1.0,1.0,1.0,1.0


In [11]:
p_schema = StructType([StructField('prname',StringType(),True),StructField('date',IntegerType(),True),StructField('numconf',DoubleType(),True),StructField('Population, 2016',DoubleType(),True),StructField('Population density per square kilometre, 2016',DoubleType(),True),StructField('Hospitals with ICUs ',DoubleType(),True),StructField('ICU beds with ventilation capacity ',DoubleType(),True),StructField('Invasive ventilators',DoubleType(),True),StructField('Airport Count',DoubleType(),True)])
df2 = sqlContext.createDataFrame(pandas_df, p_schema)
df2.show(900)

+--------------------+----+-------+----------------+---------------------------------------------+--------------------+-----------------------------------+--------------------+-------------+
|              prname|date|numconf|Population, 2016|Population density per square kilometre, 2016|Hospitals with ICUs |ICU beds with ventilation capacity |Invasive ventilators|Airport Count|
+--------------------+----+-------+----------------+---------------------------------------------+--------------------+-----------------------------------+--------------------+-------------+
|            Manitoba|  40|    0.0|       1278365.0|                                          2.3|                 1.0|                                1.0|                 1.0|          1.0|
|            Manitoba|  41|    0.0|       1278365.0|                                          2.3|                 1.0|                                1.0|                 1.0|          1.0|
|            Manitoba|  42|    1.0|       127

In [12]:
provinces = list(set(df2.select("prname").rdd.flatMap(lambda x: x).collect()))

In [13]:
from pyspark.ml.feature import VectorAssembler
# df3 = df2.withColumn("Population, 2016", col("Population, 2016").cast("double"))
# df3 = df3.withColumn("numconf", col("numconf").cast("double"))
df3 = df2.distinct()
df_temp = df3.drop(*["numconf","prname"])
vectorAssembler = VectorAssembler(inputCols = df_temp.columns, outputCol = 'features')
df3 = vectorAssembler.transform(df3)
df3 = df3.withColumnRenamed('prname','Province') \
.withColumnRenamed('numconf','Cases') \
.withColumnRenamed('Population, 2016','Population') \
.withColumnRenamed('Population density per square kilometre, 2016','Population Density') \
.withColumnRenamed('Hospitals with ICUs ','ICUs') \
.withColumnRenamed('ICU beds with ventilation capacity ', 'ICU ventilation capacity ')
df3.show()

+--------------------+----+------+-----------+------------------+----+-------------------------+--------------------+-------------+--------------------+
|            Province|date| Cases| Population|Population Density|ICUs|ICU ventilation capacity |Invasive ventilators|Airport Count|            features|
+--------------------+----+------+-----------+------------------+----+-------------------------+--------------------+-------------+--------------------+
|Northwest Territo...|  65|   4.0|    41786.0|               0.0| 0.9|       1.8333333333333333|   4.566666666666666|          0.0|[65.0,41786.0,0.0...|
|Northwest Territo...|  42|   0.0|    41786.0|               0.0| 0.9|       1.8333333333333333|   4.566666666666666|          0.0|[42.0,41786.0,0.0...|
|       New Brunswick|  44|   1.0|   747101.0|              10.5| 1.2|                      1.2|                 1.2|          0.0|[44.0,747101.0,10...|
|              Quebec|  48| 121.0|  8164361.0|               6.0| 1.1|            

In [14]:
test_data = df3.filter(df3["date"] == 80)
train_data = df3.filter(df3["date"] != 80)

In [15]:
test_data.show()

+--------------------+----+-------+-----------+------------------+----+-------------------------+--------------------+-------------+--------------------+
|            Province|date|  Cases| Population|Population Density|ICUs|ICU ventilation capacity |Invasive ventilators|Airport Count|            features|
+--------------------+----+-------+-----------+------------------+----+-------------------------+--------------------+-------------+--------------------+
|             Alberta|  80| 2562.0|  4067175.0|               6.4| 0.4|                      0.4|                 0.4|          2.0|[80.0,4067175.0,6...|
|             Ontario|  80|11184.0|1.3448494E7|              14.8| 0.6|                      0.6|                 0.6|          2.0|[80.0,1.3448494E7...|
|Northwest Territo...|  80|    5.0|    41786.0|               0.0| 0.9|       1.8333333333333333|   4.566666666666666|          0.0|[80.0,41786.0,0.0...|
|              Quebec|  80|18357.0|  8164361.0|               6.0| 1.1|     

In [16]:
train_data.show()

+--------------------+----+------+-----------+------------------+----+-------------------------+--------------------+-------------+--------------------+
|            Province|date| Cases| Population|Population Density|ICUs|ICU ventilation capacity |Invasive ventilators|Airport Count|            features|
+--------------------+----+------+-----------+------------------+----+-------------------------+--------------------+-------------+--------------------+
|Northwest Territo...|  65|   4.0|    41786.0|               0.0| 0.9|       1.8333333333333333|   4.566666666666666|          0.0|[65.0,41786.0,0.0...|
|Northwest Territo...|  42|   0.0|    41786.0|               0.0| 0.9|       1.8333333333333333|   4.566666666666666|          0.0|[42.0,41786.0,0.0...|
|       New Brunswick|  44|   1.0|   747101.0|              10.5| 1.2|                      1.2|                 1.2|          0.0|[44.0,747101.0,10...|
|              Quebec|  48| 121.0|  8164361.0|               6.0| 1.1|            

In [17]:
# X
train_df_map = {}
for pro in provinces:
    temp_df = train_data.filter(train_data["Province"]==pro)
    train_df_map[pro] = temp_df
test_df_map = {}
for pro in provinces:
    temp_df = test_data.filter(test_data["Province"]==pro)
    test_df_map[pro] = temp_df
# df_mt=df.filter(df.prname=='Manitoba')
# df_yk=df.filter(df.prname=='Yukon')
# df_ns=df.filter(df.prname=='Nova Scotia')
# df_nt=df.filter(df.prname=='Northwest Territories')
# df_nl=df.filter(df.prname=='Newfoundland and Labrador')
# df_ab=df.filter(df.prname=='Alberta')
# df_nn=df.filter(df.prname=='Nunavut')
# df_nb=df.filter(df.prname=='New Brunswick')
# df_sa=df.filter(df.prname=='Saskatchewan')
# df_pei=df.filter(df.prname=='Prince Edward Island')
# df_on=df.filter(df.prname=='Ontario')
# df_bc=df.filter(df.prname=='British Columbia')
# df_qc=df.filter(df.prname=='Quebec')
# df_qc.show()

# # y
# df_mt_y=df_y.filter(df_y.prname=='Manitoba')
# df_mt_y.show()
# df_yk_y=df_y.filter(df_y.prname=='Yukon')
# df_ns_y=df_y.filter(df_y.prname=='Nova Scotia')
# df_nt_y=df_y.filter(df_y.prname=='Northwest Territories')
# df_nl_y=df_y.filter(df_y.prname=='Newfoundland and Labrador')
# df_ab_y=df_y.filter(df_y.prname=='Alberta')
# df_nn_y=df_y.filter(df_y.prname=='Nunavut')
# df_nb_y=df_y.filter(df_y.prname=='New Brunswick')
# df_sa_y=df_y.filter(df_y.prname=='Saskatchewan')
# df_pei_y=df_y.filter(df_y.prname=='Prince Edward Island')
# df_on_y=df_y.filter(df_y.prname=='Ontario')
# df_bc_y=df_y.filter(df_y.prname=='British Columbia')
# df_qc_y=df_y.filter(df_y.prname=='Quebec')


### Linear Regression

In [18]:
from pyspark.ml.regression import LinearRegression
model_map = {}
for k in train_df_map:
    lr = LinearRegression(featuresCol = 'features', labelCol='Cases',regParam=0.0, elasticNetParam=0.7)
    lr_model = lr.fit(train_df_map[k])
    model_map[k] = lr_model

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator
sum = 0
for k in model_map:
    print("------------------------------------------------")
    print("Prediction for Province: " + k)
    print("Coefficients: " + str(model_map[k].coefficients))
    print("Intercept: " + str(model_map[k].intercept))
    lr_predictions = model_map[k].transform(test_df_map[k])
    lr_predictions.select("prediction","Cases").show()
    sum += lr_predictions.select("prediction").collect()[0][0]
    print("------------------------------------------------")

------------------------------------------------
Prediction for Province: Alberta
Coefficients: [61.81436475870289,0.0,-212.27620282525297,-3396.4192452040475,-3396.4192452040475,-3396.4192452040475,0.0]
Intercept: 2692.7882808356285
+-----------------+------+
|       prediction| Cases|
+-----------------+------+
|2203.666669205384|2562.0|
+-----------------+------+

------------------------------------------------
------------------------------------------------
Prediction for Province: Nunavut
Coefficients: [0.0,0.0,0.0,0.0,0.0,0.0,0.0]
Intercept: 0.0
+----------+-----+
|prediction|Cases|
+----------+-----+
|       0.0|  0.0|
+----------+-----+

------------------------------------------------
------------------------------------------------
Prediction for Province: Manitoba
Coefficients: [8.073921200750494,0.0,-191.03624076534985,0.0,0.0,0.0,0.0]
Intercept: 67.23504231565038
+------------------+-----+
|        prediction|Cases|
+------------------+-----+
|273.76538461538524|245.0|
+

In [22]:
real_case = 35383
print(real_case)
print(sum)

35383
26405.738302979047


In [23]:
real_case = 35383
last_closest_sum = -1
params = [0,0]
for regP in np.arange(0,1,0.1):
    for elasP in np.arange(0,1,0.1):
        temp_sum = 0
        for k in train_df_map:
            lr = LinearRegression(featuresCol = 'features', labelCol='Cases',regParam=regP, elasticNetParam=elasP)
            lr_model = lr.fit(train_df_map[k])
            lr_predictions = lr_model.transform(test_df_map[k])
            lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Cases",metricName="r2")
            temp_sum += lr_predictions.select("prediction").collect()[0][0]
        if last_closest_sum == -1:
                last_closest_sum = temp_sum
        else:
            if  __builtins__.abs(real_case - temp_sum) <=  __builtins__.abs(real_case - last_closest_sum):
                last_closest_sum = float(temp_sum)
                params[0] = regP
                params[1] = elasP

In [24]:
print(last_closest_sum)
print(params[0])
print(params[1])

26405.738302979033
0.0
0.7000000000000001


### alternative

In [None]:
# DO NOT DELETE
t1 = CA_covid_confirmed_case.toPandas()
t1 = data.pivot(index='prname', columns='date')['numconf']
t1=t1.reset_index()
t2=CA_population.toPandas()
t2['prname']=t2['Geographic_name']
t2=t2.drop('Geographic_name',axis=1)
t3=CA_hospitalbeds.toPandas()
t3['prname']=t3['Region']
t3=t3.drop('Region',axis=1)
t4=global_airports.toPandas()
t4=t4.rename(columns={"iso_region": "prname", "name": "airport_count"})
t4=t4.groupby('prname').count().reset_index()
t=pd.concat([t1, t2,t3,t4], axis=1, join='outer').drop('prname',axis=1)
t=t.fillna({'airport_count':0})

In [None]:
t.columns

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
X = t.drop('19-04-2020',axis=1)
y = t['19-04-2020']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

reg = LinearRegression().fit(X_train, y_train)