In [2]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df=spark.read.csv('covid.csv',inferSchema=True,header=True)
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- sex: integer (nullable = true)
 |-- patient_type: integer (nullable = true)
 |-- entry_date: string (nullable = true)
 |-- date_symptoms: string (nullable = true)
 |-- date_died: string (nullable = true)
 |-- intubed: integer (nullable = true)
 |-- pneumonia: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- pregnancy: integer (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- copd: integer (nullable = true)
 |-- asthma: integer (nullable = true)
 |-- inmsupr: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- other_disease: integer (nullable = true)
 |-- cardiovascular: integer (nullable = true)
 |-- obesity: integer (nullable = true)
 |-- renal_chronic: integer (nullable = true)
 |-- tobacco: integer (nullable = true)
 |-- contact_other_covid: integer (nullable = true)
 |-- covid_res: integer (nullable = true)
 |-- icu: integer (nullable = true)



In [3]:
#shape of dataset
print((df.count(),len(df.columns)))

(566602, 23)


In [4]:
from pyspark.sql.functions import col,isnan, when, count
#counting null values
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+---+------------+----------+-------------+---------+-------+---------+---+---------+--------+----+------+-------+------------+-------------+--------------+-------+-------------+-------+-------------------+---------+---+
| id|sex|patient_type|entry_date|date_symptoms|date_died|intubed|pneumonia|age|pregnancy|diabetes|copd|asthma|inmsupr|hypertension|other_disease|cardiovascular|obesity|renal_chronic|tobacco|contact_other_covid|covid_res|icu|
+---+---+------------+----------+-------------+---------+-------+---------+---+---------+--------+----+------+-------+------------+-------------+--------------+-------+-------------+-------+-------------------+---------+---+
|  0|  0|           0|         0|            0|        0|      0|        0|  0|        0|       0|   0|     0|      0|           0|            0|             0|      0|            0|      0|                  0|        0|  0|
+---+---+------------+----------+-------------+---------+-------+---------+---+---------+--------+--

In [5]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- sex: integer (nullable = true)
 |-- patient_type: integer (nullable = true)
 |-- entry_date: string (nullable = true)
 |-- date_symptoms: string (nullable = true)
 |-- date_died: string (nullable = true)
 |-- intubed: integer (nullable = true)
 |-- pneumonia: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- pregnancy: integer (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- copd: integer (nullable = true)
 |-- asthma: integer (nullable = true)
 |-- inmsupr: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- other_disease: integer (nullable = true)
 |-- cardiovascular: integer (nullable = true)
 |-- obesity: integer (nullable = true)
 |-- renal_chronic: integer (nullable = true)
 |-- tobacco: integer (nullable = true)
 |-- contact_other_covid: integer (nullable = true)
 |-- covid_res: integer (nullable = true)
 |-- icu: integer (nullable = true)



In [6]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
sex,566602,1.50672606167998,0.4999551992349244,1,2
patient_type,566602,1.2151651423750711,0.4109372238196795,1,2
intubed,566602,76.56295247810633,39.05867551766631,1,99
pneumonia,566602,1.846262455833195,0.5609386442720459,1,99
age,566602,42.62248280097847,16.659972551727172,0,120
pregnancy,566602,50.40069219663891,47.50157914867205,1,98
diabetes,566602,2.2106328604558403,5.683522656741282,1,98
copd,566602,2.280221389970385,5.327832491046167,1,98
asthma,566602,2.2650290680230567,5.334658337099209,1,98


In [7]:
df.groupby("date_died").count().show()

+----------+-----+
| date_died|count|
+----------+-----+
|06-04-2020|   80|
|05-04-2020|   56|
|11-06-2020|  609|
|10-02-2020|    1|
|21-04-2020|  235|
|17-03-2020|    2|
|03-06-2020|  629|
|02-05-2020|  343|
|21-06-2020|  559|
|01-04-2020|   48|
|24-04-2020|  254|
|09-04-2020|  106|
|14-06-2020|  641|
|26-05-2020|  576|
|10-04-2020|  131|
|12-05-2020|  453|
|02-06-2020|  590|
|12-03-2020|    1|
|15-04-2020|  138|
|28-06-2020|  199|
+----------+-----+
only showing top 20 rows



In [11]:

df = df.withColumn("date_died", when(df.date_died == "9999-99-99","0").when(df.date_died != "9999-99-99","1").otherwise(df.date_died))
df.select("date_died").show()

+---------+
|date_died|
+---------+
|        0|
|        0|
|        0|
|        0|
|        1|
|        1|
|        0|
|        0|
|        0|
|        0|
|        0|
|        0|
|        0|
|        0|
|        0|
|        0|
|        0|
|        0|
|        0|
|        0|
+---------+
only showing top 20 rows



In [14]:
from pyspark.sql.types import DoubleType 
df = df.withColumn("died", df["date_died"].cast(DoubleType()))

root
 |-- id: string (nullable = true)
 |-- sex: integer (nullable = true)
 |-- patient_type: integer (nullable = true)
 |-- entry_date: string (nullable = true)
 |-- date_symptoms: string (nullable = true)
 |-- date_died: string (nullable = true)
 |-- intubed: integer (nullable = true)
 |-- pneumonia: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- pregnancy: integer (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- copd: integer (nullable = true)
 |-- asthma: integer (nullable = true)
 |-- inmsupr: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- other_disease: integer (nullable = true)
 |-- cardiovascular: integer (nullable = true)
 |-- obesity: integer (nullable = true)
 |-- renal_chronic: integer (nullable = true)
 |-- tobacco: integer (nullable = true)
 |-- contact_other_covid: integer (nullable = true)
 |-- covid_res: integer (nullable = true)
 |-- icu: integer (nullable = true)
 |-- died: double (nullable = true)



In [17]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler
# encoder = OneHotEncoder().setInputCols(["intubed"]).setOutputCols(["intubed_encoded"])

# encoder_model=encoder.fit(Age_udfIndex_df)
# encoder_df=encoder_model.transform(Age_udfIndex_df)

# encoder_df.toPandas().head()

assembler = VectorAssembler().setInputCols(["intubed", "diabetes", "asthma","obesity","tobacco","age", "hypertension", "other_disease"]).setOutputCol("vectorized_features")
        

assembler_df=assembler.transform(df)
assembler_df.head()

Row(id='16169f', sex=2, patient_type=1, entry_date='04-05-2020', date_symptoms='02-05-2020', date_died='0', intubed=97, pneumonia=2, age=27, pregnancy=97, diabetes=2, copd=2, asthma=2, inmsupr=2, hypertension=2, other_disease=2, cardiovascular=2, obesity=2, renal_chronic=2, tobacco=2, contact_other_covid=2, covid_res=1, icu=97, died=0.0, vectorized_features=DenseVector([97.0, 2.0, 2.0, 2.0, 2.0, 27.0, 2.0, 2.0]))

In [18]:
label_indexer = StringIndexer()\
         .setInputCol ("died")\
         .setOutputCol ("label")

label_indexer_model=label_indexer.fit(assembler_df)
label_indexer_df=label_indexer_model.transform(assembler_df)

label_indexer_df.select("died","label").toPandas().head()

Unnamed: 0,died,label
0,0.0,0.0
1,0.0,0.0
2,0.0,0.0
3,0.0,0.0
4,1.0,1.0


In [20]:
import pandas as pd
scaler = StandardScaler()\
         .setInputCol ("vectorized_features")\
         .setOutputCol ("features")
        
scaler_model=scaler.fit(label_indexer_df)
scaler_df=scaler_model.transform(label_indexer_df)
pd.set_option('display.max_colwidth', 40)
scaler_df.select("vectorized_features","features").head(5)

[Row(vectorized_features=DenseVector([97.0, 2.0, 2.0, 2.0, 2.0, 27.0, 2.0, 2.0]), features=DenseVector([2.4834, 0.3519, 0.3749, 0.3707, 0.3589, 1.6207, 0.3663, 0.3082])),
 Row(vectorized_features=DenseVector([97.0, 2.0, 2.0, 2.0, 2.0, 24.0, 2.0, 2.0]), features=DenseVector([2.4834, 0.3519, 0.3749, 0.3707, 0.3589, 1.4406, 0.3663, 0.3082])),
 Row(vectorized_features=DenseVector([2.0, 2.0, 2.0, 1.0, 2.0, 54.0, 2.0, 2.0]), features=DenseVector([0.0512, 0.3519, 0.3749, 0.1853, 0.3589, 3.2413, 0.3663, 0.3082])),
 Row(vectorized_features=DenseVector([2.0, 2.0, 2.0, 2.0, 2.0, 30.0, 2.0, 2.0]), features=DenseVector([0.0512, 0.3519, 0.3749, 0.3707, 0.3589, 1.8007, 0.3663, 0.3082])),
 Row(vectorized_features=DenseVector([2.0, 1.0, 2.0, 2.0, 2.0, 60.0, 1.0, 2.0]), features=DenseVector([0.0512, 0.1759, 0.3749, 0.3707, 0.3589, 3.6014, 0.1832, 0.3082]))]

In [22]:
train, test = scaler_df.randomSplit([0.85, 0.15], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 481341
Test Dataset Count: 85261


In [23]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)
predictions = lrModel.transform(test)
predictions.select('label', 'features',  'rawPrediction', 'prediction', 'probability').toPandas().head(5)

Unnamed: 0,label,features,rawPrediction,prediction,probability
0,0.0,"[2.483443145841612, 0.35189443603744...","[5.537113583168972, -5.537113583168972]",0.0,"[0.9960775692451396, 0.0039224307548..."
1,0.0,"[2.483443145841612, 0.35189443603744...","[5.404126384454103, -5.404126384454103]",0.0,"[0.9955221590564124, 0.0044778409435..."
2,0.0,"[2.483443145841612, 0.35189443603744...","[5.0279663531554455, -5.027966353155...",0.0,"[0.9934905291131187, 0.0065094708868..."
3,0.0,"[2.483443145841612, 0.35189443603744...","[4.649517357998439, -4.649517357998439]",0.0,"[0.9905244275158027, 0.0094755724841..."
4,1.0,"[0.0512050133163219, 0.1759472180187...","[1.20717836547035, -1.20717836547035]",0.0,"[0.7697993125933869, 0.2302006874066..."


In [24]:
from pyspark.mllib.evaluation import MulticlassMetrics
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy : ", accuracy)
# Compute raw scores on the test set
from pyspark.mllib.evaluation import MulticlassMetrics

#generate model on splited dataset

results = predictions.select(['prediction', 'label'])
predictionAndLabels=results.rdd
metrics = MulticlassMetrics(predictionAndLabels)

cm=metrics.confusionMatrix().toArray()
accuracy=(cm[0][0]+cm[1][1])/cm.sum()
precision=(cm[0][0])/(cm[0][0]+cm[1][0])
recall=(cm[0][0])/(cm[0][0]+cm[0][1])
f = (2* precision * recall)/(precision+recall)
print("RandomForestClassifier: accuracy,precision,recall, f",accuracy,precision,recall, f)

Accuracy :  0.934354511441339
RandomForestClassifier: accuracy,precision,recall, f 0.934354511441339 0.9380402437296301 0.99561332046574 0.9659696846252532


In [25]:
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=20, \
                 regParam=0.1, \
                 featuresCol="features", \
                 labelCol='label')
from pyspark.ml import Pipeline
pipeline_lsvc = Pipeline(stages=[lsvc])
pipelineModel_lsvc = pipeline_lsvc.fit(train)
predDF_lsvc = pipelineModel_lsvc.transform(test)
results = predDF_lsvc.select(['prediction', 'label'])
predictionAndLabels=results.rdd
metrics = MulticlassMetrics(predictionAndLabels)

cm=metrics.confusionMatrix().toArray()
accuracy=(cm[0][0]+cm[1][1])/cm.sum()
precision=(cm[0][0])/(cm[0][0]+cm[1][0])
recall=(cm[0][0])/(cm[0][0]+cm[0][1])
f = (2* precision * recall)/(precision+recall)
print("RandomForestClassifier: accuracy,precision,recall, f",accuracy,precision,recall, f)

RandomForestClassifier: accuracy,precision,recall, f 0.935797140544915 0.935797140544915 1.0 0.9668338907469343


In [26]:
import matplotlib.pyplot as plt
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

<Figure size 640x480 with 1 Axes>

Training set areaUnderROC: 0.9191311349341746


In [27]:
predictions.select("intubed","pregnancy","diabetes", "asthma","obesity","tobacco","age", 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+-------+---------+--------+------+-------+-------+---+-----+--------------------+----------+--------------------+
|intubed|pregnancy|diabetes|asthma|obesity|tobacco|age|label|       rawPrediction|prediction|         probability|
+-------+---------+--------+------+-------+-------+---+-----+--------------------+----------+--------------------+
|     97|        2|       2|     2|      2|      2| 23|  0.0|[5.53711358316897...|       0.0|[0.99607756924513...|
|     97|        2|       2|     2|      1|      2| 27|  0.0|[5.40412638445410...|       0.0|[0.99552215905641...|
|     97|        2|       2|     2|      1|      1| 38|  0.0|[5.02796635315544...|       0.0|[0.99349052911311...|
|     97|        2|       2|     2|      2|      2| 49|  0.0|[4.64951735799843...|       0.0|[0.99052442751580...|
|      2|        2|       1|     2|      1|      2| 49|  1.0|[1.20717836547035...|       0.0|[0.76979931259338...|
|      2|        2|       2|     2|      2|      2| 34|  0.0|[1.71970358729868..