# Score Test Data Through Trained Models

In [2]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, FloatType

## Function to extract probability from array
getprob = udf(lambda v:float(v[1]),FloatType())

import pickle
import xgboost as xgb
import pandas as pd

In [3]:
cv19test = spark.table("mldata_covid19_status").filter(col("dataset") == "test")

pipelineModel = PipelineModel.load("/mnt/data/ml/covid19status/pipeline")
cv19test = pipelineModel.transform(cv19test)

cvModel = CrossValidatorModel.load("/mnt/data/ml/covid19status/rf")
cv19test = cvModel.bestModel.transform(cv19test)

cv19test = cv19test.select(col("Patient"),         
                           getprob(col("probability")).alias("probability"),
                           col("prediction"))
                   
# cv19test.drop("prediction").coalesce(1) \
#         .write.format("com.databricks.spark.csv") \
#         .option("header", "false") \
#         .save("/mnt/data/scored/covid19status/")

display(cv19test)

Patient,probability,prediction
046495b9-e1cd-47df-bc24-995e9807248f,0.83663505,1.0
076ba84e-6fd5-4024-92cc-28c3f10e42cc,0.103863336,0.0
08170598-af43-40c5-b1f6-f9e2a61261b0,0.1066435,0.0
08b13997-1dd1-4447-9da7-8e831ec12116,0.8529699,1.0
09f29ef9-559d-4d7e-abb2-b112fe29a441,0.7169256,1.0
0a18e6ae-4b9d-4ef9-9906-cdb803c7ca44,0.040954635,0.0
0deb965a-447e-4d2e-a751-ef1bda118122,0.004938915,0.0
1010bd6b-5789-4165-9a1e-e9be0db8373e,0.76617,1.0
123bf5e8-493e-487e-9e99-03f5858ddeca,0.00030590285,0.0
125885b6-2f5e-4ee3-ac59-d6b88afb1e59,0.00047758134,0.0


In [4]:
adtest = spark.table("mldata_alive_or_deceased").filter(col("dataset") == "test")

pipelineModel = PipelineModel.load("/mnt/data/ml/aliveordeceased/pipeline")
adtest = pipelineModel.transform(adtest)

cvModel = CrossValidatorModel.load("/mnt/data/ml/aliveordeceased/rf")
adtest = cvModel.bestModel.transform(adtest)

adtest_all = adtest.select(col("Patient"), getprob(col("probability")).alias("probability"))

adtest = adtest.select(col("Patient"),         
                       getprob(col("probability")).alias("probability")) \
               .join(cv19test.drop("probability").withColumnRenamed("prediction", "cv_prediction"), on=["Patient"], how="left") \
               .withColumn("probability", when(col("cv_prediction") == 0, lit(1)).otherwise(col("probability"))) \
               .drop("cv_prediction")

# adtest.coalesce(1) \
#       .write.format("com.databricks.spark.csv") \
#       .option("header", "false") \
#       .save("/mnt/data/scored/aliveordeceased/")

display(adtest)

Patient,probability
046495b9-e1cd-47df-bc24-995e9807248f,0.97505933
076ba84e-6fd5-4024-92cc-28c3f10e42cc,1.0
08170598-af43-40c5-b1f6-f9e2a61261b0,1.0
08b13997-1dd1-4447-9da7-8e831ec12116,0.9904291
09f29ef9-559d-4d7e-abb2-b112fe29a441,0.93722504
0a18e6ae-4b9d-4ef9-9906-cdb803c7ca44,1.0
0deb965a-447e-4d2e-a751-ef1bda118122,1.0
1010bd6b-5789-4165-9a1e-e9be0db8373e,0.8611076
123bf5e8-493e-487e-9e99-03f5858ddeca,1.0
125885b6-2f5e-4ee3-ac59-d6b88afb1e59,1.0


In [5]:
vstest = spark.table("mldata_ventilation_status").filter(col("dataset") == "test")

pipelineModel = PipelineModel.load("/mnt/data/ml/ventilationstatus/pipeline")
vstest = pipelineModel.transform(vstest)

cvModel = CrossValidatorModel.load("/mnt/data/ml/ventilationstatus/dt")
vstest = cvModel.bestModel.transform(vstest)

vstest_all = vstest.select(col("Patient"), getprob(col("probability")).alias("probability"))

vstest = vstest.select(col("Patient"),         
                       getprob(col("probability")).alias("probability")) \
               .join(cv19test.drop("probability").withColumnRenamed("prediction", "cv_prediction"), on=["Patient"], how="left") \
               .withColumn("probability", when(col("cv_prediction") == 0, lit(0)).otherwise(col("probability"))) \
               .drop("cv_prediction")

# vstest.coalesce(1) \
#       .write.format("com.databricks.spark.csv") \
#       .option("header", "false") \
#       .save("/mnt/data/scored/ventilationstatus/")

display(vstest)

Patient,probability
046495b9-e1cd-47df-bc24-995e9807248f,0.014859659
076ba84e-6fd5-4024-92cc-28c3f10e42cc,0.0
08170598-af43-40c5-b1f6-f9e2a61261b0,0.0
08b13997-1dd1-4447-9da7-8e831ec12116,0.014859659
09f29ef9-559d-4d7e-abb2-b112fe29a441,0.024038462
0a18e6ae-4b9d-4ef9-9906-cdb803c7ca44,0.0
0deb965a-447e-4d2e-a751-ef1bda118122,0.0
1010bd6b-5789-4165-9a1e-e9be0db8373e,0.120836206
123bf5e8-493e-487e-9e99-03f5858ddeca,0.0
125885b6-2f5e-4ee3-ac59-d6b88afb1e59,0.0


In [6]:
dhtest = spark.table("mldata_days_hospitalized").filter(col("dataset") == "test")

pipelineModel = PipelineModel.load("/mnt/data/ml/dayshospitalized/pipeline")
dhtest = pipelineModel.transform(dhtest)

cvModel = CrossValidatorModel.load("/mnt/data/ml/dayshospitalized/rfr")
dhtest = cvModel.bestModel.transform(dhtest)

dhtest_all = dhtest.select(col("Patient"),col("prediction"))

dhtest = dhtest.select(col("Patient"),         
                       col("prediction")) \
               .join(cv19test.drop("probability").withColumnRenamed("prediction", "cv_prediction"), on=["Patient"], how="left") \
               .withColumn("prediction", when(col("cv_prediction") == 0, lit(0)).otherwise(col("prediction"))) \
               .drop("cv_prediction")

# dhtest.coalesce(1) \
#       .write.format("com.databricks.spark.csv") \
#       .option("header", "false") \
#       .save("/mnt/data/scored/dayshospitalized/")

display(dhtest)

Patient,prediction
00f594e1-6b73-40cb-a028-cffe86b12e94,14.771829969011783
01812846-a928-49f5-ad25-494db0eb5205,10.124187030736566
046c66fb-1f52-4f4d-9b6d-2cfa7a2a5605,0.0
04a71615-ff6f-4a9f-a824-126b9173f2f6,14.825453815663332
050e5edf-e422-473c-8f61-2f9ea2049d8d,0.0
05c2dcc7-ef23-4f90-b676-55f227b6b08b,14.69946812127413
06405f69-b67f-451f-aa96-4c686baef513,14.125929476422591
08461ddc-12df-458e-9fe7-2a97e8d325ae,14.710779569306764
090f97f6-8f7a-4fdc-b4e3-48a4fb0c298a,0.0
09a804cc-7874-4626-8d6b-ea3b15bebacf,14.771829969011783


In [7]:
ditest = spark.table("mldata_days_icu").filter(col("dataset") == "test")

pipelineModel = PipelineModel.load("/mnt/data/ml/daysicu/pipeline")
ditest = pipelineModel.transform(ditest)

cvModel = CrossValidatorModel.load("/mnt/data/ml/daysicu/rfr")
ditest = cvModel.bestModel.transform(ditest)

ditest_all = ditest.select(col("Patient"), col("prediction"))

ditest = ditest.select(col("Patient"),         
                       col("prediction")) \
               .join(cv19test.drop("probability").withColumnRenamed("prediction", "cv_prediction"), on=["Patient"], how="left") \
               .withColumn("prediction", when(col("cv_prediction") == 0, lit(0)).otherwise(col("prediction"))) \
               .drop("cv_prediction")

# ditest.coalesce(1) \
#       .write.format("com.databricks.spark.csv") \
#       .option("header", "false") \
#       .save("/mnt/data/scored/daysicu/")

display(ditest_all)

Patient,prediction
00f594e1-6b73-40cb-a028-cffe86b12e94,4.469582216326628
01812846-a928-49f5-ad25-494db0eb5205,6.885782121687572
046c66fb-1f52-4f4d-9b6d-2cfa7a2a5605,5.893094285507658
04a71615-ff6f-4a9f-a824-126b9173f2f6,4.515527665370084
050e5edf-e422-473c-8f61-2f9ea2049d8d,6.117023744762064
05c2dcc7-ef23-4f90-b676-55f227b6b08b,4.566733488923362
06405f69-b67f-451f-aa96-4c686baef513,4.596733466951619
08461ddc-12df-458e-9fe7-2a97e8d325ae,4.516158183886205
090f97f6-8f7a-4fdc-b4e3-48a4fb0c298a,5.624553704492018
09a804cc-7874-4626-8d6b-ea3b15bebacf,4.494400971621473


In [8]:
cvModel.bestModel.extractParamMap()

## Probability Adjusted Days Hopsitalized, in ICU, and Ventilation Status

Makes an adjustment to the two continuous measures by multiplying day estimates by the likelihood a patient is COVID-19 positive.

In [10]:
# Combine all predictions/probabilities 
combo = ( cv19test.drop('prediction')
         .withColumnRenamed('probability', 'cv19_prob')
         .join(adtest_all, on='Patient')
         .withColumnRenamed('probability', 'ad_prob')
         .join(vstest_all, on='Patient')
         .withColumnRenamed('probability', 'vs_prob')
         .join(dhtest_all, on='Patient')
         .withColumnRenamed('prediction', 'dh_pred')
         .join(ditest_all, on='Patient')
         .withColumnRenamed('prediction', 'di_pred')
        )

# Create new columns for adjusted predictions/probabilities, drop non-adjusted columns
combo = ( combo.withColumn('ad_prob_adj', combo.ad_prob*combo.cv19_prob)
         .withColumn('vs_prob_adj', combo.vs_prob*combo.cv19_prob)
         .withColumn('dh_pred_adj', combo.dh_pred*combo.cv19_prob)
         .withColumn('di_pred_adj', combo.di_pred*combo.cv19_prob)
         .drop('ad_prob', 'vs_prob', 'dh_pred', 'di_pred')
        )

In [11]:
display(combo)

Patient,cv19_prob,ad_prob_adj,vs_prob_adj,dh_pred_adj,di_pred_adj
046495b9-e1cd-47df-bc24-995e9807248f,0.83663505,0.81576884,0.012432111,12.37364524284933,3.8800188506281184
076ba84e-6fd5-4024-92cc-28c3f10e42cc,0.103863336,0.096108176,0.0087654255,1.4541975759357824,0.5776166841940555
08170598-af43-40c5-b1f6-f9e2a61261b0,0.1066435,0.090302296,0.012886396,1.211579707195269,0.5883114059378197
08b13997-1dd1-4447-9da7-8e831ec12116,0.8529699,0.8448062,0.012674841,12.739556971611476,3.8564176123747194
09f29ef9-559d-4d7e-abb2-b112fe29a441,0.7169256,0.67192066,0.017233789,9.372918613336314,3.971052415258527
0a18e6ae-4b9d-4ef9-9906-cdb803c7ca44,0.040954635,0.03701525,0.0034563188,0.5723789610271637,0.2395533803483747
0deb965a-447e-4d2e-a751-ef1bda118122,0.004938915,0.003198305,0.0043901466,0.0573196976930952,0.0305155269597022
1010bd6b-5789-4165-9a1e-e9be0db8373e,0.76617,0.6597548,0.09258108,7.86833577114885,5.297898392420122
123bf5e8-493e-487e-9e99-03f5858ddeca,0.00030590285,0.00021149257,3.696414e-05,0.0037188914372292,0.0016671380024936
125885b6-2f5e-4ee3-ac59-d6b88afb1e59,0.00047758134,0.00035254323,5.7709116e-05,0.0062303687869993,0.0028842461337714


In [12]:
vstest_adj = combo.select('Patient', 'vs_prob_adj')
vstest_adj.coalesce(1) \
      .write.format("com.databricks.spark.csv") \
      .option("header", "false") \
      .save("/mnt/data/scored/ventilationstatus/adj/")

dhtest_adj = combo.select('Patient', 'dh_pred_adj')
dhtest_adj.coalesce(1) \
      .write.format("com.databricks.spark.csv") \
      .option("header", "false") \
      .save("/mnt/data/scored/dayshospitalized/adj/")

ditest_adj = combo.select('Patient', 'di_pred_adj')
ditest_adj.coalesce(1) \
      .write.format("com.databricks.spark.csv") \
      .option("header", "false") \
      .save("/mnt/data/scored/daysicu/adj/")