# Forest of Doom
### By Ryan Dickson
### Edited by Mohinder Dick
> References go here for presentation version. 

In [1]:
import os
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
from dateutil.parser import parse
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import StringIndexer, HashingTF, VectorAssembler, IDF
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

## Set up environment depend variables

In [19]:
# Set up the path of the files and environment va

if sc._conf.get('spark.master') == 'yarn-client' or sc._conf.get('spark.master') == 'yarn-cluster':
    urn = 'hdfs://sparkdl04:8020/palooza/data/visit_train_panda.csv'
    urnTest = 'hdfs://sparkdl04:8020/palooza/data/validate/visit_test_panda.csv'
    
    # Set up the path of the python pyspark should use
    #os.environ['PYSPARK_PYTHON'] = '/opt/anaconda/bin/python'
else:
    urn = 'file:///C:/Users/%s/Documents/Projects/ML/Source/UPMC/Pharmacy/visit_train_panda.csv' % (os.getenv('USERNAME', 'dickm'))
    urnTest = 'file:///C:/Users/%s/Documents/Projects/ML/Source/UPMC/Pharmacy/visit_test_panda.csv' % (os.getenv('USERNAME', 'dickm'))
    #os.environ['PYSPARK_PYTHON'] = 'C:\Users\dickm\AppData\Local\Continuum\Anaconda2\python.exe'

# Used avoid issues in windows when reading text files.
partitionNumber = 500

print('Train file path: %s.\nTest file path: %s.\nPyspark path: %s.' % (urn, urnTest, os.environ.get('PYSPARK_PYTHON')))

Train file path: file:///C:/Users/madick/Documents/Projects/ML/Source/UPMC/Pharmacy/visit_train_panda.csv.
Test file path: file:///C:/Users/madick/Documents/Projects/ML/Source/UPMC/Pharmacy/visit_test_panda.csv.
Pyspark path: None.


## Verify Spark

This launches the notebook and provides the spark context in the variable *sc*. You can use the context to preview the configuration.

In [3]:
print sc._conf.getAll()
test = sc.parallelize(range(1000))
print '\nsum of 1 to 1000: ', test.reduce(lambda a, b: a+b )

[(u'spark.rdd.compress', u'True'), (u'spark.serializer.objectStreamReset', u'100'), (u'spark.master', u'local[*]'), (u'spark.submit.deployMode', u'client'), (u'spark.app.name', u'PySparkShell')]

sum of 1 to 1000:  499500


## Get the training data

I added parallelism due to issue on windows

In [20]:
chargesRDD = sc.textFile(urn, partitionNumber)

#Get a new RDD with map function and lambda keyword. Remove header row.
header = chargesRDD.take(1)[0]
chargesRDD = chargesRDD.filter(lambda line: line!=header)
chargesRDDSplit = chargesRDD.map(lambda line: line.replace('"', '').split(','))
chargesRDDSplit.cache()

PythonRDD[20] at RDD at PythonRDD.scala:43

## Get the test data

Mo knows where it's at!

In [21]:
chargesRDDTEST = sc.textFile(urnTest, partitionNumber)
   
chargesRDDTEST = chargesRDD.filter(lambda line: line!=header)
chargesRDDSplitTEST = chargesRDD.map(lambda line: line.replace('"', '').split(','))
chargesRDDSplitTEST.cache()

PythonRDD[23] at RDD at PythonRDD.scala:43

In [22]:
#look at the data
print ('The number of training records is %s.\nThe number of test records is %s' % 
       (chargesRDD.count(), chargesRDDTEST.count()))
print ('First row of train:\n%s\nThe first row of test:\n%s\n' % (chargesRDD.take(1), chargesRDDTEST.take(1)))

The number of training records is 279808.
The number of test records is 279808
First row of train:
[u'"HdBgCT1YkEl14280","SHY",436,"I",77,"W","M","MS","10/07/2014","10/10/2014",3,"197.7","SECOND MALIG NEO LIVE","1",10028']
The first row of test:
[u'"HdBgCT1YkEl14280","SHY",436,"I",77,"W","M","MS","10/07/2014","10/10/2014",3,"197.7","SECOND MALIG NEO LIVE","1",10028']



### Raw Features

* VisitID - Identifier for patient visit.
* Hospital - Admitting hospital.
* Dept_Code - department code.
* PaymentType - I am guessing a payment type for visit.
* Age - Age of the patient in years.
* Race - De-identified race of the patient.
* Gender - Gender ("M" - male, "F" - female)
* FC - ?
* ArriveDate - Date of admission. 
* DischargeDate - Date of discharge
* LOS - length of patient stay in days.
* DXCODE - Diagnosis code.
* Description - Description of diagnosis
* DispenseID - ?
* DOC - ?

## Build Random Forest Model
### We want to predict the length of stay (LOS) given the patient demographics, dxcode and deptcode, and day of the week admitted.

Assumptions of feature relevance...
* Dept_code categorical, some departments would have more serious patients than others
* Day of the week, Patients admitted over weekend may require a longer length of stay to be seen by necessary staff
* Dxcode, multiple per patient, may need to use PCA to reduce number (PCA is not always good before random forest, see references)
* Demographics: age, gender and race 


We go through the following pipeline:
* Encode/Extract features
* Train the model
* Evaluate the model on unseen data
* Draw conclusions and make recommendations

We should not need to normalize the features for Random Forest, but bucketing may be needed to help with outliers

In [23]:
dxcodeRDD = chargesRDDSplit.map(lambda line: (line[0], line[11])).groupByKey().distinct().cache()
dxCodes = dxcodeRDD.values().flatMap(list).distinct()
dxCodeCount = dxCodes.count()

In [28]:
def merge(x, y):
    
    if x is not None: 
        x['dxCount'] += 1
        if 'dxcode' in x:
            x['dxcode'] = list(set(x['dxcode'] + y['dxcode']))
        else:
            x = y
    else:
        x = y
    return x


def mapAndFold(rdd):
    return rdd.map(lambda line: (line[0], dict(
             los=float(line[10]),
             age=int(line[4]),
             hospital_visit=line[1],
             dept_code=line[2],
             race=line[5], 
             gender_female=1 if line[6]=='F' else 0,    #Encode gender as boolean
             dxcode=[line[11]],
             admit_day=parse(line[8]).weekday(),
             admit_month=parse(line[8]).month,
             dxCount=1,
             fc=line[7]
            ))).foldByKey(None, merge)


In [29]:
chargesByVisitRDD = mapAndFold(chargesRDDSplit)
#df = chargesByVisitRDD.values().toDF()

In [30]:
chargesByVisitRDD.take(2)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 15.0 failed 1 times, most recent failure: Lost task 1.0 in stage 15.0 (TID 3016, localhost): org.apache.spark.SparkException: Python worker did not connect back in time
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:136)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:134)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:101)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:131)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker did not connect back in time
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:136)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:134)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:101)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:131)
	... 16 more


In [None]:
df = chargesByVisitRDD.values().toDF()

In [None]:
chargesByVisitRDDTEST = mapAndFold(chargesRDDSplitTEST)
dfTEST = chargesByVisitRDD.values().toDF()

In [None]:
stringIndexers = [
    StringIndexer(inputCol="hospital_visit", outputCol="hospitalIndex"),
    StringIndexer(inputCol="dept_code", outputCol="deptIndex"),
    StringIndexer(inputCol="race", outputCol="raceIndex")
]

hashingTF = HashingTF(numFeatures=2*dxCodeCount, inputCol="dxcode", outputCol="dxCodes")
idf = IDF(inputCol="dxCodes", outputCol="idfDxCodes", minDocFreq=10)

mungePipeline = Pipeline(stages=stringIndexers + [hashingTF, idf])

mungingModel = mungePipeline.fit(df)
trainingData = mungingModel.transform(df)

In [None]:
testingData = mungingModel.transform(dfTEST)

## Munge it!
String Indexers Need to have all values, so will need to fit combined traing and test data if unseen labels are present


In [None]:
assembler = VectorAssembler(
   inputCols=["age", 
              "deptIndex", 
              "gender_female",
              "raceIndex",
              "hospitalIndex",
              "admit_day",
              "admit_month",
              "dxCount",
              "idfDxCodes"
             ],
    outputCol="features")

transformedTrainingDF = assembler.transform(trainingData).select('features','los')

In [None]:
transformedTestingDF = assembler.transform(testingData).select('features','los')

### Train Model
Now we generate the training and test data. We use the ***seed*** function to ensure a repeatable split of the data between runs.

In [None]:
rf = RandomForestRegressor(featuresCol="features", labelCol="los",maxBins=1000, seed=1234)

#Magic Numbers?
rf.setNumTrees(100) 
rf.setMaxDepth(10) # Max of Spark is 30
rf.setMinInstancesPerNode(5)
rf.setFeatureSubsetStrategy('all')

model = rf.fit(transformedTrainingDF)

### Evaluate Model
We evaluate the model on the unseen dataset that was not used to train the model. For reference here is the stats for los in the training data

In [None]:
df.describe('los').show()

In [None]:
# Select (prediction, true label) and compute test error
predictions = model.transform(transformedTestingDF).select('los','prediction')
evaluator = RegressionEvaluator(
    labelCol="los", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)