## Random Forest Classifier

Random Forest learning algorithm for classification. It supports both binary and multiclass labels, as well as both continuous and categorical features

In [1]:
import findspark
findspark.init()
findspark.find()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ConditionFeatures').getOrCreate()

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorSlicer, VectorAssembler, ChiSqSelector, VectorIndexer, UnivariateFeatureSelector, VarianceThresholdSelector
from pyspark.sql.functions import *
import numpy as np
from pyspark.sql.types import IntegerType

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/03 13:42:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/03 13:42:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


import pyspark as pyspark

sc.stop()

conf = pyspark.SparkConf().setAll([('spark.executor.memory', '16g'),('spark.driver.memory','16g')])

sc = pyspark.SparkContext(conf=conf)`

### Reading and Merging Data

In [2]:
df = spark.read.option("header",True).csv('../../synthea-sample-data/data/10k_synthea_covid19_csv/conditions.csv').select('PATIENT','Code', 'Description')
deathDf = spark.read.option("header",True).csv('../../synthea-sample-data/data/10k_synthea_covid19_csv/patients.csv').select('Id', 'DEATHDATE')
deadSet = df.join(deathDf, (df.PATIENT == deathDf.Id)).na.drop().drop('Id', 'Code')
labels = spark.read.option("header",True).csv('../FeatureSelection/dfCovid_DeceasedCovid.csv').select('PATIENT', 'covid-19', 'deceased & covid-19')

merged = df.join(deathDf, (df.PATIENT == deathDf.Id), 'left').drop( 'Id')

merged = merged.withColumn('deceased', when(col('DEATHDATE').isNotNull(), 1)).na.fill(0)
merged = merged.join(labels, ('PATIENT'), 'left').dropDuplicates()

In [3]:
groupedDf = merged.groupBy("PATIENT", 'Code').pivot("Code").agg(count("Code").alias("count")).na.fill(0)
merged =merged.select('PATIENT', 'deceased', 'covid-19', 'deceased & covid-19')
finalDf = groupedDf.join(merged, ['PATIENT'], 'left')


In [4]:
cols = list(set(finalDf.columns) - {'PATIENT', 'deceased', 'Code', 'Description', 'covid-19', 'deceased & covid-19'})
assembler = VectorAssembler().setInputCols(cols).setOutputCol('features')
finalDf = finalDf.withColumn("covid-19", finalDf["covid-19"].cast(IntegerType())).withColumn("deceased & covid-19", finalDf["deceased & covid-19"].cast(IntegerType()))
df = assembler.transform(finalDf)
df.printSchema()

root
 |-- PATIENT: string (nullable = true)
 |-- Code: string (nullable = true)
 |-- 10509002: long (nullable = true)
 |-- 109838007: long (nullable = true)
 |-- 110030002: long (nullable = true)
 |-- 124171000119105: long (nullable = true)
 |-- 126906006: long (nullable = true)
 |-- 127013003: long (nullable = true)
 |-- 127295002: long (nullable = true)
 |-- 128613002: long (nullable = true)
 |-- 132281000119108: long (nullable = true)
 |-- 1501000119109: long (nullable = true)
 |-- 1551000119108: long (nullable = true)
 |-- 156073000: long (nullable = true)
 |-- 157141000119108: long (nullable = true)
 |-- 15777000: long (nullable = true)
 |-- 16114001: long (nullable = true)
 |-- 161622006: long (nullable = true)
 |-- 162573006: long (nullable = true)
 |-- 162864005: long (nullable = true)
 |-- 1734006: long (nullable = true)
 |-- 185086009: long (nullable = true)
 |-- 190905008: long (nullable = true)
 |-- 19169002: long (nullable = true)
 |-- 192127007: long (nullable = true)
 |-

### Chi-Squared Features

Deceased

In [9]:
selector = ChiSqSelector(numTopFeatures=10, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="deceased")
chiResult = selector.fit(df).transform(df)

                                                                                

In [10]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(numTrees=3, maxDepth=5, labelCol="deceased", seed=42,leafCol="leafId")
(train, test) = chiResult.randomSplit([0.8, 0.2])
model = rf.fit(train).setFeaturesCol('selectedFeatures')


23/04/03 14:03:50 WARN MemoryStore: Not enough space to cache rdd_562_6 in memory! (computed 65.2 MiB so far)
23/04/03 14:03:50 WARN MemoryStore: Not enough space to cache rdd_562_9 in memory! (computed 43.5 MiB so far)
23/04/03 14:03:50 WARN MemoryStore: Not enough space to cache rdd_562_3 in memory! (computed 5.4 MiB so far)
23/04/03 14:03:50 WARN MemoryStore: Not enough space to cache rdd_562_10 in memory! (computed 12.1 MiB so far)
23/04/03 14:03:50 WARN BlockManager: Persisting block rdd_562_6 to disk instead.
23/04/03 14:03:50 WARN BlockManager: Persisting block rdd_562_9 to disk instead.
23/04/03 14:03:50 WARN BlockManager: Persisting block rdd_562_10 to disk instead.
23/04/03 14:03:50 WARN BlockManager: Persisting block rdd_562_3 to disk instead.
23/04/03 14:03:52 WARN MemoryStore: Not enough space to cache rdd_562_0 in memory! (computed 8.1 MiB so far)
23/04/03 14:03:52 WARN BlockManager: Persisting block rdd_562_0 to disk instead.
23/04/03 14:03:52 WARN MemoryStore: Not enoug

Covid-19

In [11]:
df = assembler.transform(finalDf)
df = df.na.drop()
selector = ChiSqSelector(numTopFeatures=10, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="covid-19")
chiResult = selector.fit(df).transform(df)

                                                                                

In [7]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(numTrees=3, maxDepth=5, labelCol="deceased", seed=42,leafCol="leafId")
(train, test) = chiResult.randomSplit([0.8, 0.2])
model = rf.fit(train).setFeaturesCol('features')


23/04/03 13:59:44 WARN MemoryStore: Not enough space to cache rdd_225_8 in memory! (computed 43.5 MiB so far)
23/04/03 13:59:44 WARN MemoryStore: Not enough space to cache rdd_225_6 in memory! (computed 43.5 MiB so far)
23/04/03 13:59:44 WARN MemoryStore: Not enough space to cache rdd_225_10 in memory! (computed 18.4 MiB so far)
23/04/03 13:59:44 WARN MemoryStore: Not enough space to cache rdd_225_9 in memory! (computed 18.4 MiB so far)
23/04/03 13:59:44 WARN MemoryStore: Not enough space to cache rdd_225_2 in memory! (computed 8.1 MiB so far)
23/04/03 13:59:45 WARN BlockManager: Persisting block rdd_225_2 to disk instead.
23/04/03 13:59:45 WARN BlockManager: Persisting block rdd_225_8 to disk instead.
23/04/03 13:59:45 WARN BlockManager: Persisting block rdd_225_6 to disk instead.
23/04/03 13:59:45 WARN BlockManager: Persisting block rdd_225_10 to disk instead.
23/04/03 13:59:45 WARN BlockManager: Persisting block rdd_225_9 to disk instead.
23/04/03 13:59:47 WARN MemoryStore: Not enou

In [12]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(numTrees=3, maxDepth=5, labelCol="deceased", seed=42,leafCol="leafId")
(train, test) = chiResult.randomSplit([0.8, 0.2])
model = rf.fit(train).setFeaturesCol('selectedFeatures')


23/04/03 14:05:22 WARN MemoryStore: Not enough space to cache rdd_848_0 in memory! (computed 43.5 MiB so far)
23/04/03 14:05:22 WARN MemoryStore: Not enough space to cache rdd_848_6 in memory! (computed 43.5 MiB so far)
23/04/03 14:07:47 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 132649 ms exceeds timeout 120000 ms
23/04/03 14:07:52 WARN SparkContext: Killing executors is not supported by current scheduler.
23/04/03 14:07:53 WARN MemoryStore: Not enough space to cache rdd_848_5 in memory! (computed 43.5 MiB so far)
23/04/03 14:07:53 WARN MemoryStore: Not enough space to cache rdd_848_2 in memory! (computed 8.1 MiB so far)
23/04/03 14:07:53 WARN BlockManager: Block rdd_848_8 could not be removed as it was not found on disk or in memory
23/04/03 14:07:54 WARN BlockManager: Persisting block rdd_848_2 to disk instead.
23/04/03 14:07:54 WARN MemoryStore: Not enough space to cache rdd_848_4 in memory! (computed 28.6 MiB so far)
23/04/03 14:07:54 WARN BlockMan

[Stage 252:>                                                      (0 + 11) / 12]23/04/03 14:07:57 WARN BlockManager: Putting block rdd_848_3 failed due to exception org.apache.spark.TaskKilledException.
23/04/03 14:07:57 WARN BlockManager: Block rdd_848_3 could not be removed as it was not found on disk or in memory
23/04/03 14:07:57 WARN BlockManager: Putting block rdd_848_11 failed due to exception org.apache.spark.TaskKilledException.
23/04/03 14:07:57 WARN BlockManager: Block rdd_848_11 could not be removed as it was not found on disk or in memory
23/04/03 14:07:57 WARN BlockManager: Putting block rdd_848_7 failed due to exception org.apache.spark.TaskKilledException.
23/04/03 14:07:57 WARN BlockManager: Putting block rdd_848_10 failed due to exception org.apache.spark.TaskKilledException.
23/04/03 14:07:57 WARN BlockManager: Putting block rdd_848_1 failed due to exception org.apache.spark.TaskKilledException.
23/04/03 14:07:57 WARN BlockManager: Putting block rdd_848_9 failed du

23/04/03 14:07:57 WARN BlockManager: Putting block rdd_848_0 failed due to exception org.apache.spark.TaskKilledException.
23/04/03 14:07:57 WARN BlockManager: Block rdd_848_0 could not be removed as it was not found on disk or in memory
23/04/03 14:07:57 WARN BlockManager: Putting block rdd_848_5 failed due to exception org.apache.spark.TaskKilledException.
23/04/03 14:07:57 WARN BlockManager: Block rdd_848_5 could not be removed as it was not found on disk or in memory
23/04/03 14:07:57 WARN BlockManager: Asked to remove block rdd_848_0, which does not exist
23/04/03 14:07:57 WARN TaskSetManager: Lost task 0.0 in stage 252.0 (TID 952) (cs-218992.cs.txstate.edu executor driver): TaskKilled (Stage cancelled)
23/04/03 14:07:57 WARN TaskSetManager: Lost task 5.0 in stage 252.0 (TID 957) (cs-218992.cs.txstate.edu executor driver): TaskKilled (Stage cancelled)
23/04/03 14:07:57 WARN BlockManager: Putting block rdd_848_6 failed due to exception org.apache.spark.TaskKilledException.
23/04/03

ConnectionRefusedError: [Errno 111] Connection refused

Deceased & Covid-19 

In [None]:
df = assembler.transform(finalDf)
df = df.na.drop()
selector = ChiSqSelector(numTopFeatures=10, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="deceased & covid-19")
chiResult = selector.fit(df).transform(df)


In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(numTrees=3, maxDepth=5, labelCol="deceased", seed=42,leafCol="leafId")
(train, test) = chiResult.randomSplit([0.8, 0.2])
model = rf.fit(train).setFeaturesCol('features')


In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(numTrees=3, maxDepth=5, labelCol="deceased", seed=42,leafCol="leafId")
(train, test) = chiResult.randomSplit([0.8, 0.2])
model = rf.fit(train).setFeaturesCol('selectedFeatures')
