# Prediction de cas de Covid 19 en Canada

- Felipe Gonzalez

Le but de cette projet est de predire le numero de cas des personnes contagiées par jour dans le provinces de Canada.

Le projet a 3 notebooks:

1. Collecte et integration de données
2. Traitement
3. Developpement du modele

## 2. Traitement

la variable cible est "numtoday" qui est le nombre de cas covid par jour

#### Chargement du donees

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

data_df = spark.read.csv("/FileStore/tables/Covid_DB_08Mar/Total_Result", header = True, inferSchema = True)
data_df_forPrediction = spark.read.csv("/FileStore/tables/Covid_DB_08Mar/ForPrediction", header = True, inferSchema = True)

### 2. Traitement (Data Wrangling)

converte le numéro d'identification de la province en fonction de la population  
|  1 | Ontario  |  
|  2 | Quebec   |  
|  3 | BC       |  
|  4 | Alberta  |  
|  5 | Manitoba |  
|  6 | Saskatch |  
|  7 | NS       |  
|  8 | New Brun |  
|  9 | Newfound |  
| 10 | Prince E |  
| 11 | NW Terr  |  
| 12 | Yukon    |  
| 13 | Nunavut  |

In [0]:
from pyspark.sql.functions import when, lit, col

pruid_dicc = {35:1 , 24:2, 59:3, 48:4, 46:5, 47:6, 12:7, 13:8, 10:9, 11:10, 61:11, 60:12, 62:13}
for k,v in pruid_dicc.items():
  data_df = data_df.withColumn("pruid",when((col("pruid")==k),v).otherwise(col("pruid")))
  data_df_forPrediction = data_df_forPrediction.withColumn("pruid",when((col("pruid")==k),v).otherwise(col("pruid")))

Conversion d'une variable de date en une variable numérique à utiliser pour le modèle de régression

In [0]:
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType

# Convesrion de String a Date
func =  udf (lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType())
df = data_df.withColumn('date', func(col('date')))
df_predict_tmp = data_df_forPrediction.withColumn('date', func(col('date')))
# convesrion de Date a Numerique
func_2 =  udf (lambda x: datetime.toordinal(x))
df = df.withColumn('date_num', func_2(col('date')))
df_predict_tmp = df_predict_tmp.withColumn('date_num', func_2(col('date')))

date_min_value = df.agg({"date_num": "min"}).collect()[0][0]
date_max_value = df.agg({"date_num": "max"}).collect()[0][0]
ndays = int(date_max_value) - int(date_min_value) 
print("Min date num: ", 0)
print("n days: ", ndays)
df = df.withColumn('date_num', col('date_num') - date_min_value)
df_predict_tmp = df_predict_tmp.withColumn('date_num', col('date_num') - date_min_value)

- Filtrage pour n'utiliser que les provinces avec le plus grand nombre de cas de covid (Ontario, Québec, Alberta)

In [0]:
df = df.filter((df.prname == 'Ontario') | (df.prname == 'Quebec') |  (df.prname == 'Alberta'))
df_predict_tmp = df_predict_tmp.filter((df_predict_tmp.prname == 'Ontario') | (df_predict_tmp.prname == 'Quebec') |  (df_predict_tmp.prname == 'Alberta'))

- Filtrage du numtoday (variable cible) positif

In [0]:
df = df.filter((df.numtoday > 0))
df_predict_tmp = df_predict_tmp.filter((df_predict_tmp.numtoday > 0))

- Filtrage pour les jours supérieurs à 40 (mi-mars) car le nombre de cas et le nombre de testés sont assez faibles ou ne sont pas fiables

In [0]:
df = df.filter((df.date_num > 40))
df_predict_tmp = df_predict_tmp.filter((df_predict_tmp.date_num > 40))

- Remplacer des valeurs manquantes des variables liées au covid par la médiane de les attributs dans deux semaines

In [0]:
cols_to_replace = [ 'pruid', 'prname', 'date_num', 'numrecover', 'ratetested', 'ratetotal', 'ratedeaths', 
       'numtestedtoday', 'numrecoveredtoday', 'numactive', 'rateactive', 'ratetotal_last7',            
       'avgtotal_last7', 'avgdeaths_last7']

    
df_tmp = df.select(cols_to_replace)

# création d'une liste avec des intervalles de 2 semaines
date_min_value = df_tmp.agg({"date_num": "min"}).collect()[0][0]
date_max_value = df_tmp.agg({"date_num": "max"}).collect()[0][0]
twoweeks = int((date_max_value - date_min_value) / 14.0)

import numpy as np
date_interval = np.linspace(date_min_value,date_max_value,twoweeks)    

In [0]:
# création d'un dictionnaire avec les valeurs médianes pendant 2 semaines
mean_values_dicc = {}
prov = df.select('prname').distinct().collect()

cols_with_missingvalues = [ 'numrecover', 'ratetested', 'ratetotal', 'ratedeaths', 
       'numtestedtoday', 'numrecoveredtoday', 'numactive', 'rateactive', 'ratetotal_last7',            
       'avgtotal_last7', 'avgdeaths_last7']

#cols_with_missingvalues = [ 'numtested', 'numrecover', 'ratetested',
#       'percentdeath', 'numtestedtoday', 'numrecoveredtoday', 'percentactive',
#       'numactive', 'numtotal_last14', 'ratetotal_last14',
#       'numdeaths_last14', 'ratedeaths_last14', 'numtotal_last7',
#       'ratetotal_last7', 'numdeaths_last7', 'ratedeaths_last7',
#       'avgtotal_last7', 'avgincidence_last7', 'avgdeaths_last7',
#       'avgratedeaths_last7']

# provinces loop
for p in prov:
  print(p[0])
  # date interval loop
  tmp = {}
  for i, j in zip(date_interval, date_interval[1:]):
    df_tmp_1 = df_tmp.filter((df_tmp.prname == p[0]) & (df_tmp.date_num > i) & (df_tmp.date_num < j))
    df_tmp_2 = df_tmp_1.groupBy("prname").mean()  
    # features loop
    feature_tmp = {}
    for feature in cols_with_missingvalues:
      meanvalue = df_tmp_2.select("avg("+feature+")").collect()[0][0]
      feature_tmp[feature] = meanvalue
    interval = (i,j)
    tmp[interval] = feature_tmp
  mean_values_dicc[p[0]] = tmp
  

In [0]:
# traitement pour remplacer des valeurs manquantes des variables liées au covid par la médiane de les attributs dans deux semaines
for prov, interval_temp in mean_values_dicc.items():
  print(prov)
  for interval, features_tmp in interval_temp.items():
    for feature, value in features_tmp.items():
      df = df.withColumn(feature, when((col(feature).isNull()) & (col("prname")==prov) & (col("date_num") <= interval[1]) & (col("date_num") >= interval[0]), value).otherwise(col(feature)))
      df_predict_tmp = df_predict_tmp.withColumn(feature, when((col(feature).isNull()) & (col("prname")==prov) & (col("date_num") <= interval[1]) & (col("date_num") >= interval[0]), value).otherwise(col(feature)))

Les valeurs nulles des colonnes météo sont mises à zéro (la plupart d'entre elles sont, par exemple, pluie mm ou neige mm)

In [0]:
df = df.fillna(0.0)
df_predict_tmp = df_predict_tmp.fillna(0.0)

In [0]:
def SaveDF_tofile(df,filename):
  path_to_save = "dbfs:/FileStore/tables/Covid_DB_08Mar/ForAnalysis/"
  path_toSave_Results = path_to_save + filename
  df.write.format("com.databricks.spark.csv").option("header", "true").save(path_toSave_Results)

#dbutils.fs.rm("/FileStore/tables/Covid_DB/ForAnalysis/", True)
#SaveDF_tofile(df, "CovidAnalyzed")
#SaveDF_tofile(df_predict_tmp, "CovidForPrediction")

In [0]:
dbutils.fs.rm("/FileStore/tables/Covid_DB_08Mar/ForAnalysis/", True)

In [0]:
SaveDF_tofile(df, "CovidAnalyzed")

In [0]:
SaveDF_tofile(df_predict_tmp, "CovidForPrediction")