In [1]:
# import libraries
import boto3, re, sys, math, json, os, sagemaker, urllib.request
from sagemaker import get_execution_role
import numpy as np                                
import pandas as pd                               
import matplotlib.pyplot as plt                   
from IPython.display import Image                 
from IPython.display import display               
from time import gmtime, strftime                 
from sagemaker.predictor import csv_serializer


# Define IAM role
role = get_execution_role()
prefix = 'sagemaker/DEMO-xgboost-dm'
containers = {'us-west-2': '433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest',
              'us-east-1': '811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest',
              'us-east-2': '825641698319.dkr.ecr.us-east-2.amazonaws.com/xgboost:latest',
              'eu-west-1': '685385470294.dkr.ecr.eu-west-1.amazonaws.com/xgboost:latest'} # each region has its XGBoost container
my_region = boto3.session.Session().region_name # set the region of the instance
print("Success - the MySageMakerInstance is in the " + my_region + " region. You will use the " + containers[my_region] + " container for your SageMaker endpoint.")

Success - the MySageMakerInstance is in the us-west-2 region. You will use the 433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest container for your SageMaker endpoint.


In [2]:
bucket_name = 'projectdatasetcovid' # <--- CHANGE THIS VARIABLE TO A UNIQUE NAME FOR YOUR BUCKET
s3 = boto3.resource('s3')
try:
    if  my_region == 'us-east-1':
      s3.create_bucket(Bucket=bucket_name)
    else: 
      s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': my_region })
    print('S3 bucket created successfully')
except Exception as e:
    print('S3 error: ',e)

S3 error:  An error occurred (BucketAlreadyOwnedByYou) when calling the CreateBucket operation: Your previous request to create the named bucket succeeded and you already own it.


In [3]:
try:
  urllib.request.urlretrieve ("https://projectdatasetcovid.s3-us-west-2.amazonaws.com/201029COVID19MEXICO.csv","201029COVID19MEXICO.csv")
  print('Success: 201029COVID19MEXICO.csv')
except Exception as e:
  print('Data load error: ',e)

Success: 201029COVID19MEXICO.csv


In [4]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql import SQLContext

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [5]:
#from pyspark.sql import SQLContext
url = "201029COVID19MEXICO.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

In [6]:
df = sqlContext.read.csv(SparkFiles.get("201029COVID19MEXICO.csv"), header=True, inferSchema= True)

In [7]:
df.printSchema()

root
 |-- FECHA_ACTUALIZACION: string (nullable = true)
 |-- ID_REGISTRO: string (nullable = true)
 |-- ORIGEN: integer (nullable = true)
 |-- SECTOR: integer (nullable = true)
 |-- ENTIDAD_UM: integer (nullable = true)
 |-- SEXO: integer (nullable = true)
 |-- ENTIDAD_NAC: integer (nullable = true)
 |-- ENTIDAD_RES: integer (nullable = true)
 |-- MUNICIPIO_RES: integer (nullable = true)
 |-- TIPO_PACIENTE: integer (nullable = true)
 |-- FECHA_INGRESO: string (nullable = true)
 |-- FECHA_SINTOMAS: string (nullable = true)
 |-- FECHA_DEF: string (nullable = true)
 |-- INTUBADO: integer (nullable = true)
 |-- NEUMONIA: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- NACIONALIDAD: integer (nullable = true)
 |-- Pregnant: integer (nullable = true)
 |-- HABLA_LENGUA_INDIG: integer (nullable = true)
 |-- Native: integer (nullable = true)
 |-- DIABETES: integer (nullable = true)
 |-- EPOC: integer (nullable = true)
 |-- ASMA: integer (nullable = true)
 |-- INMUSUPR: integer

In [8]:
import pyspark.sql.functions as F
#df1 = df.withColumnRenamed("TIPO_PACIENTE","Patient_Type")\.withColumnRenamed("SEXO","Sex")

df1 = df.selectExpr("TIPO_PACIENTE as PATIENT_TYPE",
                   "SEXO as SEX",
                   "Age as AGE",
                   "INTUBADO as INTUBED",
                    "NEUMONIA as NEUMONIA",
                    "Pregnant as PREGNANT",
                   "DIABETES as DIABETES",
                  "EPOC as EPOC",
                   "ASMA as ASTHMA",
                   "INMUSUPR as INMUSUPR",
                   "HIPERTENSION as HYPERTENSION",
                   "CARDIOVASCULAR as CARDIOVASCULAR",
                   "OBESIDAD as OBESITY",
                   "RENAL_CRONICA as RENAL_CRONIC",
                   "OTRA_COM as OTHER_DISEASE",
                  "TABAQUISMO as TOBACCO",
                   "OTRO_CASO as CONTACT_OTHER_COVID",
                   "CLASIFICACION_FINAL as CLASSIFICACION_FINAL",
                   "UCI as ICU",
                   "RESULTADO_LAB as LAB_RESULT",
                    "FECHA_DEF as DEATH"
                  )
df1.printSchema()

root
 |-- PATIENT_TYPE: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- INTUBED: integer (nullable = true)
 |-- NEUMONIA: integer (nullable = true)
 |-- PREGNANT: integer (nullable = true)
 |-- DIABETES: integer (nullable = true)
 |-- EPOC: integer (nullable = true)
 |-- ASTHMA: integer (nullable = true)
 |-- INMUSUPR: integer (nullable = true)
 |-- HYPERTENSION: integer (nullable = true)
 |-- CARDIOVASCULAR: integer (nullable = true)
 |-- OBESITY: integer (nullable = true)
 |-- RENAL_CRONIC: integer (nullable = true)
 |-- OTHER_DISEASE: integer (nullable = true)
 |-- TOBACCO: integer (nullable = true)
 |-- CONTACT_OTHER_COVID: integer (nullable = true)
 |-- CLASSIFICACION_FINAL: integer (nullable = true)
 |-- ICU: integer (nullable = true)
 |-- LAB_RESULT: integer (nullable = true)
 |-- DEATH: string (nullable = true)



In [9]:
from pyspark.sql.functions import col, when

#changed DEATH TO 1 or 0

#If yes ->1, No -> 0 and Missing/NA -> 2
df2 = df1.withColumn("PREGNANT",when(col("PREGNANT") == "2", 0).when(col("PREGNANT") == "1", 1).when(col("SEX") == "2", 0).otherwise(np.nan))
df2 = df2.withColumn("NEUMONIA",when(col("NEUMONIA") == "2", 0).when(col("NEUMONIA") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("INTUBED",when(col("INTUBED") == "2", 0).when(col("INTUBED") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("DIABETES",when(col("DIABETES") == "2", 0).when(col("DIABETES") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("EPOC",when(col("EPOC") == "2", 0).when(col("EPOC") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("ASTHMA",when(col("ASTHMA") == "2", 0).when(col("ASTHMA") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("INMUSUPR",when(col("INMUSUPR") == "2", 0).when(col("INMUSUPR") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("HYPERTENSION",when(col("HYPERTENSION") == "2", 0).when(col("HYPERTENSION") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("OTHER_DISEASE",when(col("OTHER_DISEASE") == "2", 0).when(col("OTHER_DISEASE") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("CARDIOVASCULAR",when(col("CARDIOVASCULAR") == "2", 0).when(col("CARDIOVASCULAR") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("OBESITY",when(col("OBESITY") == "2", 0).when(col("OBESITY") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("RENAL_CRONIC",when(col("RENAL_CRONIC") == "2", 0).when(col("RENAL_CRONIC") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("TOBACCO",when(col("TOBACCO") == "2", 0).when(col("TOBACCO") == "1", 1).otherwise(np.nan))
#leave out contact_other_covid
df2 = df2.withColumn("CONTACT_OTHER_COVID",when(col("CONTACT_OTHER_COVID") == "2", 0).when(col("CONTACT_OTHER_COVID") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("ICU",when(col("ICU") == "2", 0).when(col("ICU") == "1", 1).otherwise(np.nan))
df2 = df2.withColumn("DEATH",when(col("DEATH") == "9999-99-99", 0).otherwise("1"))
df2 = df2.withColumn("LAB_RESULT",when(col("LAB_RESULT") == "2", 0).when(col("LAB_RESULT") == "1", 1).otherwise(np.nan))

df2.show(10);

+------------+---+---+-------+--------+--------+--------+----+------+--------+------------+--------------+-------+------------+-------------+-------+-------------------+--------------------+---+----------+-----+
|PATIENT_TYPE|SEX|AGE|INTUBED|NEUMONIA|PREGNANT|DIABETES|EPOC|ASTHMA|INMUSUPR|HYPERTENSION|CARDIOVASCULAR|OBESITY|RENAL_CRONIC|OTHER_DISEASE|TOBACCO|CONTACT_OTHER_COVID|CLASSIFICACION_FINAL|ICU|LAB_RESULT|DEATH|
+------------+---+---+-------+--------+--------+--------+----+------+--------+------------+--------------+-------+------------+-------------+-------+-------------------+--------------------+---+----------+-----+
|           2|  2| 94|    0.0|     0.0|     0.0|     0.0| 0.0|   0.0|     0.0|         0.0|           1.0|    0.0|         0.0|          0.0|    0.0|                NaN|                   3|0.0|       1.0|    1|
|           1|  2| 66|    NaN|     0.0|     0.0|     1.0| 0.0|   0.0|     0.0|         1.0|           0.0|    0.0|         0.0|          0.0|    0.0|   

In [10]:
df2 =df2.filter(df2["LAB_RESULT"]==1)

In [11]:
df2.groupBy("LAB_RESULT").count().sort("count",ascending=True).show()

+----------+------+
|LAB_RESULT| count|
+----------+------+
|       1.0|546940|
+----------+------+



In [12]:
df2 = df2.withColumn("PATIENT_TYPE",when(col("PATIENT_TYPE") == "2", 1).when(col("PATIENT_TYPE") == "1", 0))
#converts the nan to unknown
df_unknown_imputed = df2.fillna(0)

df_unknown_imputed.cache()

#number of entries on this dataset
print(df_unknown_imputed.count())

546940


In [13]:
from pyspark.sql.types import *

df_unknown_imputed.printSchema()
df_unknown_imputed = df_unknown_imputed.withColumn("DEATH", df_unknown_imputed["DEATH"].cast(DoubleType()))

root
 |-- PATIENT_TYPE: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- INTUBED: double (nullable = false)
 |-- NEUMONIA: double (nullable = false)
 |-- PREGNANT: double (nullable = false)
 |-- DIABETES: double (nullable = false)
 |-- EPOC: double (nullable = false)
 |-- ASTHMA: double (nullable = false)
 |-- INMUSUPR: double (nullable = false)
 |-- HYPERTENSION: double (nullable = false)
 |-- CARDIOVASCULAR: double (nullable = false)
 |-- OBESITY: double (nullable = false)
 |-- RENAL_CRONIC: double (nullable = false)
 |-- OTHER_DISEASE: double (nullable = false)
 |-- TOBACCO: double (nullable = false)
 |-- CONTACT_OTHER_COVID: double (nullable = false)
 |-- CLASSIFICACION_FINAL: integer (nullable = true)
 |-- ICU: double (nullable = false)
 |-- LAB_RESULT: double (nullable = false)
 |-- DEATH: string (nullable = false)



In [14]:
distinctDF = df_unknown_imputed

In [15]:
#Considering Patient_type is determined we considered if ICU care is needed or not
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['PATIENT_TYPE','SEX','NEUMONIA', 'PREGNANT','DIABETES', 'EPOC', 'ASTHMA', 'INMUSUPR','HYPERTENSION','OTHER_DISEASE',
           'CARDIOVASCULAR','OBESITY','RENAL_CRONIC','TOBACCO']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'ICU', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['AGE']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [16]:
from pyspark.ml import Pipeline


cols = distinctDF.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(distinctDF)
distinctDF = pipelineModel.transform(distinctDF)
selectedCols = ['label', 'features'] + cols
distinctDF = distinctDF.select(selectedCols)
distinctDF.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- PATIENT_TYPE: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- INTUBED: double (nullable = false)
 |-- NEUMONIA: double (nullable = false)
 |-- PREGNANT: double (nullable = false)
 |-- DIABETES: double (nullable = false)
 |-- EPOC: double (nullable = false)
 |-- ASTHMA: double (nullable = false)
 |-- INMUSUPR: double (nullable = false)
 |-- HYPERTENSION: double (nullable = false)
 |-- CARDIOVASCULAR: double (nullable = false)
 |-- OBESITY: double (nullable = false)
 |-- RENAL_CRONIC: double (nullable = false)
 |-- OTHER_DISEASE: double (nullable = false)
 |-- TOBACCO: double (nullable = false)
 |-- CONTACT_OTHER_COVID: double (nullable = false)
 |-- CLASSIFICACION_FINAL: integer (nullable = true)
 |-- ICU: double (nullable = false)
 |-- LAB_RESULT: double (nullable = false)
 |-- DEATH: double (nullable = true)



In [17]:
train, test = distinctDF.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 382200
Test Dataset Count: 164740


In [18]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

In [19]:
#Pateint_type for ICU is also observed and it shoud ideally be 1 meaning hospitlaised if prediction is 1
predictions = lrModel.transform(test)
predictions.select('AGE', 'SEX','PATIENT_TYPE', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+---+---+------------+-----+--------------------+----------+--------------------+
|AGE|SEX|PATIENT_TYPE|label|       rawPrediction|prediction|         probability|
+---+---+------------+-----+--------------------+----------+--------------------+
| 71|  2|           0|  0.0|[7.0651362537242,...|       0.0|[0.99914635083931...|
| 82|  2|           0|  0.0|[7.72683301273775...|       0.0|[0.99955935621541...|
| 28|  2|           0|  0.0|[8.28170739259796...|       0.0|[0.99974695936392...|
| 43|  2|           0|  0.0|[8.18369905639312...|       0.0|[0.99972091054067...|
| 70|  2|           0|  0.0|[5.25248296482816...|       0.0|[0.99479275220747...|
| 80|  2|           0|  0.0|[5.45554693542275...|       0.0|[0.99574563759130...|
| 54|  1|           0|  0.0|[8.06045945419492...|       0.0|[0.99968431800184...|
| 44|  1|           0|  0.0|[7.06016344728858...|       0.0|[0.99914209888578...|
| 47|  1|           0|  0.0|[7.97262852536261...|       0.0|[0.99965534727341...|
| 56|  1|       

In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

Test Area Under ROC 0.9129017943795279


In [21]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.select('AGE', 'SEX', 'PATIENT_TYPE','label', 'rawPrediction', 'prediction', 'probability').show(10)

+---+---+------------+-----+--------------+----------+-----------+
|AGE|SEX|PATIENT_TYPE|label| rawPrediction|prediction|probability|
+---+---+------------+-----+--------------+----------+-----------+
| 71|  2|           0|  0.0|[285535.0,0.0]|       0.0|  [1.0,0.0]|
| 82|  2|           0|  0.0|[285535.0,0.0]|       0.0|  [1.0,0.0]|
| 28|  2|           0|  0.0|[285535.0,0.0]|       0.0|  [1.0,0.0]|
| 43|  2|           0|  0.0|[285535.0,0.0]|       0.0|  [1.0,0.0]|
| 70|  2|           0|  0.0|[285535.0,0.0]|       0.0|  [1.0,0.0]|
| 80|  2|           0|  0.0|[285535.0,0.0]|       0.0|  [1.0,0.0]|
| 54|  1|           0|  0.0|[285535.0,0.0]|       0.0|  [1.0,0.0]|
| 44|  1|           0|  0.0|[285535.0,0.0]|       0.0|  [1.0,0.0]|
| 47|  1|           0|  0.0|[285535.0,0.0]|       0.0|  [1.0,0.0]|
| 56|  1|           0|  0.0|[285535.0,0.0]|       0.0|  [1.0,0.0]|
+---+---+------------+-----+--------------+----------+-----------+
only showing top 10 rows



In [22]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.9063333097751165


In [23]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.select('AGE', 'SEX','PATIENT_TYPE','label', 'rawPrediction', 'prediction', 'probability').show(10)

+---+---+------------+-----+--------------------+----------+--------------------+
|AGE|SEX|PATIENT_TYPE|label|       rawPrediction|prediction|         probability|
+---+---+------------+-----+--------------------+----------+--------------------+
| 71|  2|           0|  0.0|[19.8948054770550...|       0.0|[0.99474027385275...|
| 82|  2|           0|  0.0|[19.8482550826058...|       0.0|[0.99241275413029...|
| 28|  2|           0|  0.0|[19.9626168224299...|       0.0|[0.99813084112149...|
| 43|  2|           0|  0.0|[19.9164900618665...|       0.0|[0.99582450309332...|
| 70|  2|           0|  0.0|[19.5019972708563...|       0.0|[0.97509986354281...|
| 80|  2|           0|  0.0|[19.6383878740054...|       0.0|[0.98191939370027...|
| 54|  1|           0|  0.0|[19.4359583686047...|       0.0|[0.97179791843023...|
| 44|  1|           0|  0.0|[19.9149517510683...|       0.0|[0.99574758755341...|
| 47|  1|           0|  0.0|[19.8614000174145...|       0.0|[0.99307000087072...|
| 56|  1|       

In [24]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.9159009823121372


In [25]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
predictions.select('AGE', 'SEX', 'PATIENT_TYPE','label', 'rawPrediction', 'prediction', 'probability').show(10)

+---+---+------------+-----+--------------------+----------+--------------------+
|AGE|SEX|PATIENT_TYPE|label|       rawPrediction|prediction|         probability|
+---+---+------------+-----+--------------------+----------+--------------------+
| 71|  2|           0|  0.0|[1.32590267922052...|       0.0|[0.93412217565280...|
| 82|  2|           0|  0.0|[1.32590267922052...|       0.0|[0.93412217565280...|
| 28|  2|           0|  0.0|[1.32590267922052...|       0.0|[0.93412217565280...|
| 43|  2|           0|  0.0|[1.32590267922052...|       0.0|[0.93412217565280...|
| 70|  2|           0|  0.0|[1.32590267922052...|       0.0|[0.93412217565280...|
| 80|  2|           0|  0.0|[1.32590267922053...|       0.0|[0.93412217565280...|
| 54|  1|           0|  0.0|[1.32590267922055...|       0.0|[0.93412217565281...|
| 44|  1|           0|  0.0|[1.32590267922052...|       0.0|[0.93412217565280...|
| 47|  1|           0|  0.0|[1.32590267922052...|       0.0|[0.93412217565280...|
| 56|  1|       

In [26]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.9174592579701456
