In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BDAS_ZZHO612').getOrCreate()

In [2]:
# Importing data which has a header. Schema is automatically configured.
cov19_patient_df = spark.read.csv('COVID19_open_line_list.csv', header=True, inferSchema=True)

# Let's see the data. You'll notice nulls.
cov19_patient_df.count()
print(cov19_patient_df.columns)

['ID', 'age', 'sex', 'city', 'province', 'country', 'wuhan(0)_not_wuhan(1)', 'latitude', 'longitude', 'geo_resolution', 'date_onset_symptoms', 'date_admission_hospital', 'date_confirmation', 'symptoms', 'lives_in_Wuhan', 'travel_history_dates', 'travel_history_location', 'reported_market_exposure', 'additional_information', 'chronic_disease_binary', 'chronic_disease', 'source', 'sequence_available', 'outcome', 'date_death_or_discharge', 'notes_for_discussion', 'location', 'admin3', 'admin2', 'admin1', 'country_new', 'admin_id', 'data_moderator_initials', '_c33', '_c34', '_c35', '_c36', '_c37', '_c38', '_c39', '_c40', '_c41', '_c42', '_c43', '_c44']


In [3]:
print(cov19_patient_df.dtypes)

[('ID', 'string'), ('age', 'string'), ('sex', 'string'), ('city', 'string'), ('province', 'string'), ('country', 'string'), ('wuhan(0)_not_wuhan(1)', 'int'), ('latitude', 'string'), ('longitude', 'string'), ('geo_resolution', 'string'), ('date_onset_symptoms', 'string'), ('date_admission_hospital', 'string'), ('date_confirmation', 'string'), ('symptoms', 'string'), ('lives_in_Wuhan', 'string'), ('travel_history_dates', 'string'), ('travel_history_location', 'string'), ('reported_market_exposure', 'string'), ('additional_information', 'string'), ('chronic_disease_binary', 'string'), ('chronic_disease', 'string'), ('source', 'string'), ('sequence_available', 'string'), ('outcome', 'string'), ('date_death_or_discharge', 'string'), ('notes_for_discussion', 'string'), ('location', 'string'), ('admin3', 'string'), ('admin2', 'string'), ('admin1', 'string'), ('country_new', 'string'), ('admin_id', 'string'), ('data_moderator_initials', 'string'), ('_c33', 'string'), ('_c34', 'string'), ('_c35

In [4]:
print(cov19_patient_df[["ID"]].describe().show())

+-------+--------------------+
|summary|                  ID|
+-------+--------------------+
|  count|               13198|
|   mean|   6769.250436498899|
| stddev|   3920.886020268424|
|    min|                   1|
|    max|https://www.thela...|
+-------+--------------------+

None


In [5]:
print(cov19_patient_df[["age"]].describe().show())

+-------+--------------------+
|summary|                 age|
+-------+--------------------+
|  count|                1591|
|   mean|  43.840644725822536|
| stddev|  17.016446953410178|
|    min| Ankang City	Shaa...|
|    max|                  NA|
+-------+--------------------+

None


In [6]:
print(cov19_patient_df[["sex"]].describe().show())

+-------+------+
|summary|   sex|
+-------+------+
|  count|  1470|
|   mean|4000.0|
| stddev|   NaN|
|    min|  4000|
|    max|stable|
+-------+------+

None


In [7]:
import pandas
from pyspark.ml.classification import (RandomForestClassifier, GBTClassifier, DecisionTreeClassifier)

pandas.set_option('display.max_rows', None)
cov19_patient_df = pandas.read_csv("COVID19_open_line_list.csv", index_col="ID")
# print("The dimension of the nCoV 19 patient dataset is: ")
# print(str(cov19_patient_df.shape[0]) + " rows and " + str(cov19_patient_df.shape[1]) + " columns.")
print("Columns in this data sets are: ")
print(cov19_patient_df.columns)
# print("Columns data type: ")
# print(cov19_patient_df.dtypes)

# for c in cov19_patient_df.columns:
#     print(cov19_patient_df[c].describe())


# print(cov19_patient_df['country'].value_counts())
# print(cov19_patient_df['age'].value_counts())
# print(cov19_patient_df['date_confirmation'].value_counts())
# print(cov19_patient_df['sex'].value_counts())

# print(cov19_patient_df.isnull().sum())

cov19_current_patient_df = cov19_patient_df[ \
    cov19_patient_df['date_death_or_discharge'].notnull() & \
    (cov19_patient_df['date_death_or_discharge'] != "discharge")]
print("The dimension of the nCoV 19 current patient dataset is: ")
print(str(cov19_current_patient_df.shape[0]) + " rows and " + \
      str(cov19_current_patient_df.shape[1]) + " columns.")

# print(cov19_current_patient_df['date_death_or_discharge'].value_counts())
patient_valid_columns = ['age', 'sex', 'city', 'province', 'country', 'wuhan(0)_not_wuhan(1)',
                        'date_onset_symptoms',
                         'date_admission_hospital', 'date_confirmation', 'date_death_or_discharge']
cov19_current_sub_df = cov19_patient_df[patient_valid_columns]
print(cov19_current_sub_df.columns)

patients_percent_missing = cov19_current_sub_df.isnull().sum() * 100 / len(cov19_current_sub_df)
patients_missing_value_df = pandas.DataFrame({'column_name': cov19_current_sub_df.columns,'percent_missing': patients_percent_missing})
print(patients_missing_value_df)

cov19_current_sub_df = cov19_current_sub_df[ \
    cov19_current_sub_df['sex'].notnull() & \
    (cov19_current_sub_df['sex'] != 4000) & \
    (cov19_current_sub_df['sex'] != "Female") & \
    (cov19_current_sub_df['sex'] != "Male")]

cov19_current_sub_df = cov19_current_sub_df[ \
    cov19_current_sub_df['age'].notnull() & \
    (cov19_current_sub_df['age'] != "Belgium") & \
    (cov19_current_sub_df['age'].astype("str").str.contains("-") == False)]

# print(cov19_current_sub_df['age'].value_counts())

cov19_current_sub_df["alive"] = (cov19_current_sub_df['date_death_or_discharge'].isnull() | (cov19_current_sub_df['date_death_or_discharge'] == "discharge")).astype(int)
cov19_current_sub_df.drop(['date_death_or_discharge'], axis=1, inplace=True)
# print(cov19_current_sub_df['alive'].value_counts())


cov19_current_sub_df_imputed = cov19_current_sub_df.fillna("Unknown")

patients_percent_missing = cov19_current_sub_df_imputed.isnull().sum() * 100 / len(cov19_current_sub_df_imputed)
patients_missing_value_df = pandas.DataFrame({'column_name': cov19_current_sub_df_imputed.columns,
                                 'percent_missing': patients_percent_missing})
print(patients_missing_value_df)

Columns in this data sets are: 
Index(['age', 'sex', 'city', 'province', 'country', 'wuhan(0)_not_wuhan(1)',
       'latitude', 'longitude', 'geo_resolution', 'date_onset_symptoms',
       'date_admission_hospital', 'date_confirmation', 'symptoms',
       'lives_in_Wuhan', 'travel_history_dates', 'travel_history_location',
       'reported_market_exposure', 'additional_information',
       'chronic_disease_binary', 'chronic_disease', 'source',
       'sequence_available', 'outcome', 'date_death_or_discharge',
       'notes_for_discussion', 'location', 'admin3', 'admin2', 'admin1',
       'country_new', 'admin_id', 'data_moderator_initials', 'Unnamed: 33',
       'Unnamed: 34', 'Unnamed: 35', 'Unnamed: 36', 'Unnamed: 37',
       'Unnamed: 38', 'Unnamed: 39', 'Unnamed: 40', 'Unnamed: 41',
       'Unnamed: 42', 'Unnamed: 43', 'Unnamed: 44'],
      dtype='object')
The dimension of the nCoV 19 current patient dataset is: 
91 rows and 44 columns.
Index(['age', 'sex', 'city', 'province', 'cou

In [8]:
s_df = spark.createDataFrame(cov19_current_sub_df_imputed)
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint

s_df = s_df.na.drop()
print(s_df.dtypes)

[('age', 'string'), ('sex', 'string'), ('city', 'string'), ('province', 'string'), ('country', 'string'), ('wuhan(0)_not_wuhan(1)', 'double'), ('date_onset_symptoms', 'string'), ('date_admission_hospital', 'string'), ('date_confirmation', 'string'), ('alive', 'bigint')]


In [9]:
print(pyspark.__version__)

2.1.1+hadoop2.7


In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Transformer

inputCols = ["age", "sex", "city", "province", "country", "date_onset_symptoms", "date_admission_hospital"]
#Deal with Categorical Columns
#Transform string type columns to string indexer 
ageIndexer = StringIndexer().setInputCol("age").setOutputCol("ageIndex")
sexIndexer = StringIndexer().setInputCol("sex").setOutputCol("sexIndex")
cityIndexer = StringIndexer().setInputCol("city").setOutputCol("cityIndex")
provinceIndexer = StringIndexer().setInputCol("province").setOutputCol("provinceIndex")
countryIndexer = StringIndexer().setInputCol("country").setOutputCol("countryIndex")
symptomsIndexer = StringIndexer().setInputCol("date_onset_symptoms").setOutputCol("symptomsIndex")
hospitalIndexer = StringIndexer().setInputCol("date_admission_hospital").setOutputCol("hospitalIndex")

#Transform string type columns to string indexer 
ageEncoder = OneHotEncoder().setInputCol("ageIndex").setOutputCol("ageVec")
sexEncoder = OneHotEncoder().setInputCol("sexIndex").setOutputCol("sexVec")
cityEncoder = OneHotEncoder().setInputCol("cityIndex").setOutputCol("cityVec")
provinceEncoder = OneHotEncoder().setInputCol("provinceIndex").setOutputCol("provinceVec")
countryEncoder = OneHotEncoder().setInputCol("countryIndex").setOutputCol("countryVec")
symptomsEncoder = OneHotEncoder().setInputCol("symptomsIndex").setOutputCol("symptomsVec")
hospitalEncoder = OneHotEncoder().setInputCol("hospitalIndex").setOutputCol("hospitalVec")

#Assemble everything together to be ("label","features") format
assembler = VectorAssembler().setInputCols(["ageVec", "sexVec", "cityVec", "provinceVec", "countryVec", "symptomsVec", "hospitalVec"]).setOutputCol("features")


In [11]:
# Create all three models. Note the number of trees. 
# The more trees you have, the more computation time. But this could also significantly increase accuracy. So there's a tradeoff. 
dt = DecisionTreeClassifier(labelCol='alive')
rf = RandomForestClassifier(labelCol="alive", numTrees=20)
pipeline = Pipeline().setStages([ageIndexer, sexIndexer, cityIndexer, provinceIndexer, countryIndexer, symptomsIndexer, hospitalIndexer, ageEncoder, sexEncoder, cityEncoder, provinceEncoder, countryEncoder, symptomsEncoder, hospitalEncoder, assembler])

In [12]:
# Train model. 
model_pip = pipeline.fit(s_df)
t_df = model_pip.transform(s_df)
selectedCols = ['alive', 'features']
df = t_df.select(selectedCols)
df.printSchema()
(trainingData, testData) = df.randomSplit([0.7, 0.3])
#model_dt = dt.fit(trainingData)

root
 |-- alive: long (nullable = true)
 |-- features: vector (nullable = true)



In [13]:

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'alive', maxDepth = 3)
dtModel = dt.fit(trainingData)
dt_predictions = dtModel.transform(testData)

In [14]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'alive', maxDepth = 3)
rfModel = rf.fit(trainingData)
rf_predictions = rfModel.transform(testData)

In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol = 'alive')
print("Test Area Under ROC: " + str(evaluator.evaluate(dt_predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.8491975650249031


In [16]:
print("Test Area Under ROC: " + str(evaluator.evaluate(rf_predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.8879358052019922


In [17]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol = 'alive')

# Now we're fitting the model on a subset of data.
lrModel = lr.fit(trainingData)

# And evaluating it against the test data.
lr_predictions = lrModel.evaluate(testData)

lr_predictions.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|alive|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|(660,[0,88,147,45...|[31.9120914900087...|[0.99999999999998...|       0.0|
|    0|(660,[3,88,90,436...|[-54.580110486962...|[1.97769444573408...|       1.0|
|    0|(660,[5,88,100,45...|[-60.332951897200...|[6.27670783946199...|       1.0|
|    0|(660,[6,206,436,5...|[-10.959342057468...|[1.73944485401331...|       1.0|
|    0|(660,[12,209,478,...|[63.5376068986614...|[1.0,2.5466421950...|       0.0|
|    0|(660,[13,88,90,43...|[-26.250126156450...|[3.97846059569519...|       1.0|
|    0|(660,[13,147,453,...|[54.8252868624185...|[1.0,1.5476774215...|       0.0|
|    0|(660,[15,88,90,43...|[-29.484343754163...|[1.56715591075685...|       1.0|
|    0|(660,[15,88,133,4...|[-89.557363273348...|[1.27564936318685...|       1.0|
|    0|(660,[16,

In [18]:
print("Test Area Under ROC: " + str(evaluator.evaluate(lr_predictions.predictions, {evaluator.metricName: "areaUnderROC"})))

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(lrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(lrModel.interceptVector))

trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)

Test Area Under ROC: 0.7864554510237962
Multinomial coefficients: DenseMatrix([[ 1.06001374e+01, -9.20466168e+00,  3.40729884e+01,
               3.52030292e+01, -1.93770816e+01,  1.91363264e+01,
               8.14853988e+00, -1.15747892e+01,  1.12339151e+01,
               1.26669853e+01,  1.53020816e+01,  7.94074020e+00,
              -5.12830742e+00,  6.87304490e+00,  1.28585407e+01,
               7.94584249e+00,  6.56079956e+00,  1.83464854e+01,
              -1.51015319e+01,  1.16684345e+00,  1.47832209e+01,
               1.42864358e+01,  5.08731915e+00, -3.63986534e+01,
              -5.70423476e+00,  1.40510377e+01,  6.08065862e+00,
               1.19124540e+01,  9.91029490e+00,  8.29616287e+00,
               1.17879539e+01,  5.88054181e+00, -3.68732567e+01,
               5.13307940e+00,  6.19189198e+00,  3.44124333e+00,
               2.54452107e+01,  1.01861205e+01,  1.49643218e+01,
              -9.24434228e-02, -5.18614634e+01,  1.00508178e+01,
               5.4705447

+---+------+
|FPR|   TPR|
+---+------+
|0.0|   0.0|
|0.0|0.9232|
|0.0| 0.928|
|0.0|0.9312|
|0.0|0.9328|
|0.0|0.9376|
|0.0|0.9392|
|0.0|0.9408|
|0.0|0.9424|
|0.0|0.9456|
|0.0|0.9472|
|0.0|0.9488|
|0.0|0.9504|
|0.0| 0.952|
|0.0|0.9536|
|0.0|0.9552|
|0.0|0.9568|
|0.0|0.9584|
|0.0|  0.96|
|0.0|0.9616|
+---+------+
only showing top 20 rows

areaUnderROC: 0.9999809523809523


LogisticRegression_4841b5872990cbb44df4

In [19]:
dtModel.featureImportances

SparseVector(660, {25: 0.1143, 27: 0.1272, 100: 0.0938, 521: 0.1687, 524: 0.496})

In [20]:
rfModel.featureImportances

SparseVector(660, {1: 0.0021, 7: 0.0015, 25: 0.0011, 27: 0.0029, 43: 0.0105, 79: 0.025, 87: 0.0139, 90: 0.0051, 98: 0.0001, 117: 0.0002, 139: 0.0496, 147: 0.0745, 167: 0.0089, 171: 0.0169, 190: 0.0369, 201: 0.0084, 207: 0.0429, 234: 0.0299, 282: 0.0056, 297: 0.0034, 343: 0.0082, 370: 0.0396, 392: 0.0217, 401: 0.0125, 433: 0.004, 436: 0.0178, 438: 0.0016, 442: 0.0001, 448: 0.0004, 451: 0.0038, 453: 0.065, 456: 0.0074, 462: 0.0027, 464: 0.034, 475: 0.0174, 502: 0.0338, 504: 0.0082, 507: 0.0213, 521: 0.1164, 522: 0.029, 524: 0.0702, 525: 0.0051, 526: 0.0017, 527: 0.0173, 529: 0.0337, 536: 0.0233, 568: 0.0051, 572: 0.0057, 583: 0.0136, 608: 0.0064, 610: 0.0139, 612: 0.0031, 614: 0.0002, 615: 0.008, 620: 0.0, 641: 0.0082})

In [26]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

gbt = GBTClassifier(labelCol='alive',featuresCol='features')
gbt_model = gbt.fit(trainingData)

gbt_predictions = gbt_model.transform(testData)

my_binary_gbt_eval = BinaryClassificationEvaluator(labelCol='alive', rawPredictionCol='prediction')
print("GBT")
print(my_binary_gbt_eval.evaluate(gbt_predictions))


GBT
0.6045932484781406


In [23]:
print(gbt_model.schema)

AttributeError: 'GBTClassificationModel' object has no attribute 'schema'