### load covid data from excel and save it as csv

In [63]:
import pandas as pd
df=pd.read_excel('COVID-19.xlsx')
df.to_csv("covid.csv",sep=",",index=False,header=True)

### Spark

In [73]:
#create sparksession object
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('log_reg').getOrCreate()

#Load the dataset
df=spark.read.csv('covid.csv',inferSchema=True,header=True)

In [74]:
import pyspark.sql.functions as f
df=df.where(f.col("geoId").isin({"FR", "IT", "ES"})).select('geoId','month','cases','deaths')
df=df.orderBy("geoId", ascending=True)

In [80]:
#Exploratory Data Analysis
df.describe().show()

+-------+-----+-----------------+------------------+------------------+
|summary|geoId|            month|             cases|            deaths|
+-------+-----+-----------------+------------------+------------------+
|  count|  707|              707|               707|               707|
|   mean| null| 4.41018387553041|1241.1867043847242|134.04243281471005|
| stddev| null|2.283406913644765|1914.2710420198846| 261.6102585438426|
|    min|   ES|                1|              -766|             -1918|
|    max|   IT|               12|             16269|              2004|
+-------+-----+-----------------+------------------+------------------+



In [82]:
df.show(3)

+-----+-----+-----+------+
|geoId|month|cases|deaths|
+-----+-----+-----+------+
|   ES|    8| 8148|    25|
|   ES|    8| 5114|    24|
|   ES|    8| 7039|    16|
+-----+-----+-----+------+
only showing top 3 rows



#### filter with multiple conditions

In [108]:
df.filter((df['geoId']=='FR')&(df['month']==4)).show(5)
df.filter(df['geoId']=='FR').filter(df['deaths'] >1000).show()

+-----+-----+-----+------+
|geoId|month|cases|deaths|
+-----+-----+-----+------+
|   FR|    4| 1607|   427|
|   FR|    4| 1065|   367|
|   FR|    4| 1195|   437|
|   FR|    4|  461|   242|
|   FR|    4| 1537|   369|
+-----+-----+-----+------+
only showing top 5 rows

+-----+-----+-----+------+
|geoId|month|cases|deaths|
+-----+-----+-----+------+
|   FR|    4| 2633|  1438|
|   FR|    4| 4286|  1341|
|   FR|    4| 3777|  1417|
|   FR|    4| 4267|  1053|
|   FR|    4| 5233|  2004|
+-----+-----+-----+------+



#### sum months and exculde december 2019

In [170]:
df2=df.groupBy(['geoId','month']).sum('deaths','cases').orderBy('month',ascending=True)
# Alternative
# df2=df.groupBy(['geoId','month']).agg({'deaths':'sum','cases':'sum'}).orderBy('month',ascending=True)
# df.groupBy(["geoId","month"]).sum("deaths").withColumnRenamed("sum(deaths)", "deaths_total")\
#     .sort(("month")).limit(5).show()
df2=df2.filter(df2['month']!=12)
df2.show(5)

+-----+-----+-----------+----------+
|geoId|month|sum(deaths)|sum(cases)|
+-----+-----+-----------+----------+
|   IT|    1|          0|         3|
|   FR|    1|          0|         6|
|   ES|    1|          0|         0|
|   FR|    2|          2|        51|
|   ES|    2|          0|        54|
+-----+-----+-----------+----------+
only showing top 5 rows



### converting categorical data to numerical form

In [171]:
#import required libraries
from pyspark.ml.feature import StringIndexer
geoId_indexer = StringIndexer(inputCol="geoId", outputCol="geoId_Num").fit(df2)
df3 = geoId_indexer.transform(df2)
df3.show(5)

+-----+-----+-----------+----------+---------+
|geoId|month|sum(deaths)|sum(cases)|geoId_Num|
+-----+-----+-----------+----------+---------+
|   FR|    1|          0|         6|      1.0|
|   ES|    1|          0|         0|      0.0|
|   IT|    1|          0|         3|      2.0|
|   FR|    2|          2|        51|      1.0|
|   ES|    2|          0|        54|      0.0|
+-----+-----+-----------+----------+---------+
only showing top 5 rows



In [172]:
from pyspark.ml.feature import OneHotEncoder
geoId_encoder = OneHotEncoder(inputCol="geoId_Num", outputCol="geoId_Vector")
df4 = geoId_encoder.fit(df3)
df4=df4.transform(df3)
df4.show(5)

+-----+-----+-----------+----------+---------+-------------+
|geoId|month|sum(deaths)|sum(cases)|geoId_Num| geoId_Vector|
+-----+-----+-----------+----------+---------+-------------+
|   FR|    1|          0|         6|      1.0|(2,[1],[1.0])|
|   ES|    1|          0|         0|      0.0|(2,[0],[1.0])|
|   IT|    1|          0|         3|      2.0|    (2,[],[])|
|   FR|    2|          2|        51|      1.0|(2,[1],[1.0])|
|   ES|    2|          0|        54|      0.0|(2,[0],[1.0])|
+-----+-----+-----------+----------+---------+-------------+
only showing top 5 rows



In [175]:
from pyspark.ml.feature import VectorAssembler

In [174]:
df5=df4.filter((df4['geoId_Num']!=2))
df5.show(3)

+-----+-----+-----------+----------+---------+-------------+
|geoId|month|sum(deaths)|sum(cases)|geoId_Num| geoId_Vector|
+-----+-----+-----------+----------+---------+-------------+
|   FR|    1|          0|         6|      1.0|(2,[1],[1.0])|
|   ES|    1|          0|         0|      0.0|(2,[0],[1.0])|
|   FR|    2|          2|        51|      1.0|(2,[1],[1.0])|
+-----+-----+-----------+----------+---------+-------------+
only showing top 3 rows



In [178]:
df_assembler = VectorAssembler(inputCols=['month','sum(deaths)','sum(cases)'], outputCol="features")
df5 = df_assembler.transform(df5)

In [179]:
df5.printSchema()

root
 |-- geoId: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- sum(deaths): long (nullable = true)
 |-- sum(cases): long (nullable = true)
 |-- geoId_Num: double (nullable = false)
 |-- geoId_Vector: vector (nullable = true)
 |-- features: vector (nullable = true)



In [181]:
df5.select(['features','geoId_Num']).show(5)

+--------------------+---------+
|            features|geoId_Num|
+--------------------+---------+
|       [1.0,0.0,6.0]|      1.0|
|       [1.0,0.0,0.0]|      0.0|
|      [2.0,2.0,51.0]|      1.0|
|      [2.0,0.0,54.0]|      0.0|
|[3.0,7340.0,10421...|      0.0|
+--------------------+---------+
only showing top 5 rows



### select data for building model

In [183]:
model_df=df5.select(['features','geoId_Num'])

In [189]:
from pyspark.ml.classification import LogisticRegression
#split the data 
training_df,test_df=model_df.randomSplit([0.7,0.3])

In [190]:
training_df.groupBy('geoId_Num').count().show()

+---------+-----+
|geoId_Num|count|
+---------+-----+
|      0.0|    6|
|      1.0|    6|
+---------+-----+



In [191]:
test_df.groupBy('geoId_Num').count().show()

+---------+-----+
|geoId_Num|count|
+---------+-----+
|      0.0|    2|
|      1.0|    2|
+---------+-----+



### The model

In [192]:
log_reg=LogisticRegression(labelCol='geoId_Num').fit(training_df)

### Training Results

In [193]:
train_results=log_reg.evaluate(training_df).predictions

In [195]:
train_results.filter(train_results['geoId_Num']==1).filter(train_results['prediction']==1).select(['geoId_Num','prediction','probability']).show(10,False)

+---------+----------+---------------------------------------------------------------+
|geoId_Num|prediction|probability                                                    |
+---------+----------+---------------------------------------------------------------+
|1.0      |1.0       |[0.016423423002759775,0.9835765769972403,1.923337892192834E-20]|
|1.0      |1.0       |[0.2320380887258323,0.7679619112741677,1.9996900337363204E-19] |
|1.0      |1.0       |[0.27731177676685415,0.7226882232331459,1.93404227556266E-21]  |
|1.0      |1.0       |[0.4628936820935944,0.5371063179064055,5.63269649559246E-24]   |
+---------+----------+---------------------------------------------------------------+



In [196]:
correct_preds=train_results.filter(train_results['geoId_Num']==1).filter(train_results['prediction']==1).count()

In [197]:
#accuracy on training dataset 
float(correct_preds)/(training_df.filter(training_df['geoId_Num']==1).count())

0.6666666666666666

### Test Set results

In [198]:
results=log_reg.evaluate(test_df).predictions

In [199]:
results.select(['geoId_Num','prediction']).show()

+---------+----------+
|geoId_Num|prediction|
+---------+----------+
|      1.0|       0.0|
|      0.0|       1.0|
|      1.0|       1.0|
|      0.0|       1.0|
+---------+----------+



In [201]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
#confusion matrix
true_postives = results[(results.geoId_Num == 1) & (results.prediction == 1)].count()
true_negatives = results[(results.geoId_Num == 0) & (results.prediction == 0)].count()
false_positives = results[(results.geoId_Num == 0) & (results.prediction == 1)].count()
false_negatives = results[(results.geoId_Num == 1) & (results.prediction == 0)].count()
print (true_postives)
print (true_negatives)
print (false_positives)
print (false_negatives)
print(true_postives+true_negatives+false_positives+false_negatives)
print (results.count())

1
0
2
1
4
4


In [202]:
recall = float(true_postives)/(true_postives + false_negatives)
print(recall)

0.5


In [203]:
precision = float(true_postives) / (true_postives + false_positives)
print(precision)

0.3333333333333333


In [204]:
accuracy=float((true_postives+true_negatives) /(results.count()))
print(accuracy)

0.25
