In [0]:
# Install latest version of spark. If error, check the latest and replace "spark-2.4.4"
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

import os
import findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
findspark.init()

In [0]:
# Mount to Google drive to get data
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
# Import dependencies
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.feature import *
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics

from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch

In [0]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
filename = "/content/drive/My Drive/Colab Notebooks/SPARCS-2017_cleaned2.csv"
wholeData = spark.read.csv(filename, inferSchema=True, header=True).limit(50000)

In [0]:
wholeData.show(3)

+---------------------+---------------+--------------------+---------+-------------------+------+-----+-----------------+--------------+-----------------+--------------------+-------------------------+-------------------------+--------------------+--------------------+-----------------------------------+---------------------+--------------------------------+--------------------+------------------+------------------+------------+------------------------------+-------------+-----------+
|Hospital Service Area|Hospital County|       Facility Name|Age Group|Zip Code - 3 digits|Gender| Race|        Ethnicity|Length of Stay|Type of Admission| Patient Disposition|CCS Diagnosis Description|CCS Procedure Description| APR DRG Description| APR MDC Description|APR Severity of Illness Description|APR Risk of Mortality|APR Medical Surgical Description|  Payment Typology 1|Payment Typology 2|Payment Typology 3|Birth Weight|Emergency Department Indicator|Total Charges|Total Costs|
+-------------------

In [0]:
# split column type into categorical and numerical
categorical = ['Zip Code - 3 digits', 'Age Group', 'Gender', 'Race', 'Type of Admission']
numeric = ['Length of Stay', 'Total Charges']

In [0]:
stages = [] # stages in our Pipeline

# Category Indexing with StringIndexer and OneHotEncoding to convert into binary SparseVectors
for col in categorical:
    stringIndexer = StringIndexer(inputCol=col, outputCol=col + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[col + "Index"], outputCols=[col + "classVec"])
    stages += [stringIndexer, encoder]

# Numeric columns casted from String type to Float type
for col in numeric:
    wholeData = wholeData.withColumn(col, wholeData[col].cast('float'))

In [0]:
# Create labels for training data with only 5 fields we want
trainingDataWithTarget = wholeData.withColumn("avg_charges_per_day", wholeData['Total Charges']/wholeData['Length of Stay'])\
                                  .select(['Zip Code - 3 digits', 'Age Group', 'Gender', 'Race', 'Type of Admission', 'avg_charges_per_day'])

In [0]:
trainingDataWithTarget.show(1)

+-------------------+---------+------+-----+-----------------+-------------------+
|Zip Code - 3 digits|Age Group|Gender| Race|Type of Admission|avg_charges_per_day|
+-------------------+---------+------+-----+-----------------+-------------------+
|                105| 50 to 69|     M|White|        Emergency|           7314.625|
+-------------------+---------+------+-----+-----------------+-------------------+
only showing top 1 row



In [0]:
# Transform all features into a vector using VectorAssembler with 5 fields
assemblerInputs = [c + "classVec" for c in categorical]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [0]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(trainingDataWithTarget)
preppedDataDF = pipelineModel.transform(trainingDataWithTarget)

In [0]:
# Save the pipelineModel for customized user input
pipelineModel.save("/content/drive/My Drive/Colab Notebooks/pipelineModel")

In [0]:
preppedDataDF.select(['features', 'avg_charges_per_day']).show(1)

+--------------------+-------------------+
|            features|avg_charges_per_day|
+--------------------+-------------------+
|(62,[9,50,54,57],...|           7314.625|
+--------------------+-------------------+
only showing top 1 row



In [0]:
# Label and training features
selectedcols = ["avg_charges_per_day", "features"]
vectorizedDataset = preppedDataDF.select(selectedcols)

In [0]:
# Utilized a lot below
vectorizedDataset.show(5)
print (type(vectorizedDataset))

+-------------------+--------------------+
|avg_charges_per_day|            features|
+-------------------+--------------------+
|           7314.625|(62,[9,50,54,57],...|
|      6607.83984375|(62,[9,51,53,54,5...|
|    9007.7724609375|(62,[9,50,53,54,5...|
|             7633.5|(62,[9,53,54,57],...|
| 16430.026041666668|(62,[9,50,56,57],...|
+-------------------+--------------------+
only showing top 5 rows

<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
vectorizedDataset.select('features').collect()[0]

Row(features=SparseVector(62, {9: 1.0, 50: 1.0, 54: 1.0, 57: 1.0}))

## **PySpark MLlib Decision Tree Model and RMSE**

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import GBTRegressor

train_data, test_data = vectorizedDataset.randomSplit([0.8, 0.2])
regressor = GBTRegressor(featuresCol='features', labelCol='avg_charges_per_day', maxDepth=30, maxIter=50)

model = regressor.fit(train_data)

In [0]:
predictions = model.transform(test_data)

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="avg_charges_per_day", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 12301.9


In [0]:
# Schema for 5 Fields customized test input
schema = StructType([StructField('Age Group',StringType(),True),
                     StructField('Zip Code - 3 digits',StringType(),True),
                     StructField('Gender',StringType(),True),
                     StructField('Race',StringType(),True),
                     StructField('Type of Admission',StringType(),True)])

In [0]:
# customized input data for testing
testDataWithTarget = spark.createDataFrame([Row('0 to 17','101','F','White','Emergency')], schema)

In [0]:
# reload pipelineModel
loadedModel = PipelineModel.load("/content/drive/My Drive/Colab Notebooks/pipelineModel")
testDataWithTargetDF = loadedModel.transform(testDataWithTarget)
testDataWithTargetDF.select('features').show()

+--------------------+
|            features|
+--------------------+
|(62,[23,52,53,54,...|
+--------------------+



In [0]:
predictions = model.transform(testDataWithTargetDF)

In [0]:
predictions.select('prediction').show()

+------------------+
|        prediction|
+------------------+
|12461.040988522816|
+------------------+

