# Initialize the HDFS and the resource manager
Keep in mind these commands will vary depending on where do you have this notebook and where is installed your hadoop.

In [1]:
# Start the dfs
!./hadoop-3.3.6/sbin/start-dfs.sh

Starting namenodes on [localhost]
Starting datanodes
Starting secondary namenodes [debian]


In [2]:
# Start the resource manager (YARN in our case)
!./hadoop-3.3.6/sbin/start-yarn.sh

Starting resourcemanager
Starting nodemanagers


# Imports and configurations

In [44]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.classification import RandomForestClassifier, MultilayerPerceptronClassifier, FMClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pprint import pprint
import os
import pandas as pd
import seaborn as sns

In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("SparkSessionExample") \
    .master("local[*]") \
    .getOrCreate()

24/01/03 13:57:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# Get all configuration options
conf = spark.sparkContext.getConf().getAll()
pprint(conf)

[('spark.executor.id', 'driver'),
 ('spark.driver.host', '10.0.2.15'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.app.startTime', '1704308235592'),
 ('spark.app.id', 'local-1704308237957'),
 ('spark.driver.port', '38807'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions '
  '--add-opens=java.base/java.lang=ALL-UNNAMED '
  '--add-opens=java.base/java.lang.invoke=ALL-UNNAMED '
  '--add-opens=java.base/java.lang.reflect=ALL-UNNAMED '
  '--add-opens=java.base/java.io=ALL-UNNAMED '
  '--add-opens=java.base/java.net=ALL-UNNAMED '
  '--add-opens=java.base/java.nio=ALL-UNNAMED '
  '--add-opens=java.base/java.util=ALL-UNNAMED '
  '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED '
  '--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED '
  '--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED '
  '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED '
  '--add-opens=java.base/sun.nio.cs=ALL-UNNAMED '
  '--add-opens=jav

# Read the data

In [4]:
# Spark accepts wildcards when reading data. (We load all partitions at once)
df = spark.read.parquet('hdfs:///user/hadoop/WESAD_parquet/S*.parquet')
print(f'Total number of samples {df.count()}')



Total number of samples 60807600


                                                                                

In [5]:
# Show data schema
df.printSchema()

root
 |-- ACC_1: double (nullable = true)
 |-- ACC_2: double (nullable = true)
 |-- ACC_3: double (nullable = true)
 |-- ECG: double (nullable = true)
 |-- EMG: double (nullable = true)
 |-- EDA: double (nullable = true)
 |-- Temp: float (nullable = true)
 |-- Resp: double (nullable = true)
 |-- label: integer (nullable = true)



In [6]:
# Show some features
df.select("ACC_1", "ACC_2", "ACC_3", 'label').show(10)

[Stage 4:>                                                          (0 + 1) / 1]

+-------------------+--------------------+--------------------+-----+
|              ACC_1|               ACC_2|               ACC_3|label|
+-------------------+--------------------+--------------------+-----+
|0.35099995136260986| -0.1905999779701233|  2.2841999530792236|    0|
| 0.4421999454498291|-0.11419999599456787|    2.22160005569458|    0|
| 0.5321999788284302|-0.05820000171661377|  1.9242000579833984|    0|
| 0.6926000118255615|-9.99987125396728...|  1.4021999835968018|    0|
| 0.9018000364303589| 0.04499995708465576|  0.6477999687194824|    0|
| 1.1470000743865967| 0.06540000438690186|-0.26319998502731323|    0|
| 1.4089999198913574|0.056800007820129395| -1.2230000495910645|    0|
| 1.5896000862121582|0.017199993133544922| -2.0999999046325684|    0|
|  1.661600112915039|  -0.053600013256073| -2.8071999549865723|    0|
| 1.6314001083374023|-0.14539998769760132|  -3.272599935531616|    0|
+-------------------+--------------------+--------------------+-----+
only showing top 10 

                                                                                

We see data was loaded correctly and that we can display it succesfully.

# Exploratory Data Analysis (EDA) and Data Cleaning

In [7]:
stats = df.describe(df.columns[0:5])
stats.show()

                                                                                

+-------+-------------------+--------------------+--------------------+--------------------+--------------------+
|summary|              ACC_1|               ACC_2|               ACC_3|                 ECG|                 EMG|
+-------+-------------------+--------------------+--------------------+--------------------+--------------------+
|  count|           60807600|            60807600|            60807600|            60807600|            60807600|
|   mean| 0.8116865284321901|-0.04424838120623882|-0.25900813261880495|0.001064647998794382|-0.00303824747415...|
| stddev|0.13125288401427787| 0.10388864087113163|  0.3321663786069044| 0.26868806452510596|0.017904851872606677|
|    min| -6.599999904632568|  -6.599999904632568|  -6.599999904632568|                -1.5|                -1.5|
|    max| 2.9814000129699707|  1.6089999675750732|   4.508200168609619|  1.4999542236328125|  1.4643402099609375|
+-------+-------------------+--------------------+--------------------+-----------------

In [8]:
stats = df.describe(df.columns[5:])
stats.show()



+-------+------------------+------------------+-------------------+------------------+
|summary|               EDA|              Temp|               Resp|             label|
+-------+------------------+------------------+-------------------+------------------+
|  count|          60807600|          60807600|           60807600|          60807600|
|   mean|  4.88823725419249|33.904942453534574|0.05424957702482945|1.3347263335504114|
| stddev| 3.531246876893006|1.2169364535428147|  4.099115876402935|1.6283486656155444|
|    min|               0.0|           -273.15|              -50.0|                 0|
|    max|22.410964965820312|         35.778046|    38.800048828125|                 7|
+-------+------------------+------------------+-------------------+------------------+



                                                                                

Notice there are not missing values, this will saves us some time since we won't need to deal with them.

---
As said in the dataset, labels from 5 to 7 should be ignored in this dataset. In addition, label 0 means **not defined** which gives as no information, we will also remove that one.

# Data pipeline
Here we define the pipeline followed by the data to be prepared for the training.

Here we define a custom transformer which filters the dataframe using the designed filters. 
In our case we defined two filters:
- Label != 0
- Label < 5

In this way, we will remove the labels [0, 5, 6, 7]

In [30]:
# Extends Transformer class and override _transform method. We define our filters in the constructor.
class DataCleaner(Transformer):
    
    def __init__(self, column_name=None, mean=None):
        super().__init__()
        #self.filter1 = "label <> 0"
        #self.filter2 = "label < 5"
        self.filters = ["label <> 0", "label < 5"]
        
    def _transform(self, dff):
        
        for filt in self.filters:
            dff = dff.filter(filt)
            
        return dff

The ML model accepts dataframes in the shape of [features, label], then we need to assemble all the feature columns in one single column with all the values. Fortunately there is a transformer that does this in spark.

In [31]:
feature_cols = ['ACC_1', 'ACC_2', 'ACC_3', 'ECG', 'EMG', 'EDA', 'Temp', 'Resp'] # Define feature columns

dataCleaner = DataCleaner() # Custom data cleaner
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features') # Assembler for the feature columns

In [32]:
# Create the pipeline with the stages followed by the data.
pipeline = Pipeline(stages=[dataCleaner, assembler])

In [33]:
# Fit pipeline model and transform data
pipeline_mod = pipeline.fit(df)
df_cleaned = pipeline_mod.transform(df)

In [35]:
print(f'Number of samples after removing useless labeled ones: {df_cleaned.count()}')



Number of samples after removing useless labeled ones: 31470603
Structure of the dataframe DataFrame[ACC_1: double, ACC_2: double, ACC_3: double, ECG: double, EMG: double, EDA: double, Temp: float, Resp: double, label: int, features: vector]


                                                                                

# Train the different ML models
For this task we have chosen three classification algorithms
1. Random Forest
2. Multilayer Perceptron
3. Factorization Machines

In [42]:
# Split the data into train and test
train_data, test_data = df_cleaned.randomSplit([0.7, 0.3], seed=42)

In [53]:
# Instantiate models
rf = RandomForestClassifier(featuresCol='features', labelCol='label', seed=42)
mlp = MultilayerPerceptronClassifier(layers=[8, 64, 4], featuresCol='features', labelCol='label', seed=42)
fmc = FMClassifier(featuresCol='features', labelCol='label', seed=42)

In [48]:
# Fit models
rf = rf.fit(train_data)

[Stage 30:>                                                         (0 + 2) / 8]

24/01/03 14:39:32 WARN MemoryStore: Not enough space to cache rdd_110_0 in memory! (computed 44.4 MiB so far)
24/01/03 14:39:32 WARN BlockManager: Persisting block rdd_110_0 to disk instead.
24/01/03 14:39:32 WARN MemoryStore: Not enough space to cache rdd_110_1 in memory! (computed 44.4 MiB so far)
24/01/03 14:39:32 WARN BlockManager: Persisting block rdd_110_1 to disk instead.
24/01/03 14:39:50 WARN MemoryStore: Not enough space to cache rdd_110_1 in memory! (computed 236.8 MiB so far)
24/01/03 14:39:50 WARN MemoryStore: Not enough space to cache rdd_110_0 in memory! (computed 149.9 MiB so far)




24/01/03 14:40:18 WARN MemoryStore: Not enough space to cache rdd_110_2 in memory! (computed 66.6 MiB so far)
24/01/03 14:40:18 WARN BlockManager: Persisting block rdd_110_2 to disk instead.
24/01/03 14:40:18 WARN MemoryStore: Not enough space to cache rdd_110_3 in memory! (computed 29.6 MiB so far)
24/01/03 14:40:18 WARN BlockManager: Persisting block rdd_110_3 to disk instead.
24/01/03 14:40:33 WARN MemoryStore: Not enough space to cache rdd_110_2 in memory! (computed 236.8 MiB so far)
24/01/03 14:40:35 WARN MemoryStore: Not enough space to cache rdd_110_3 in memory! (computed 149.9 MiB so far)




24/01/03 14:40:58 WARN MemoryStore: Not enough space to cache rdd_110_4 in memory! (computed 44.4 MiB so far)
24/01/03 14:40:58 WARN BlockManager: Persisting block rdd_110_4 to disk instead.
24/01/03 14:41:05 WARN MemoryStore: Not enough space to cache rdd_110_5 in memory! (computed 66.6 MiB so far)
24/01/03 14:41:05 WARN BlockManager: Persisting block rdd_110_5 to disk instead.
24/01/03 14:41:17 WARN MemoryStore: Not enough space to cache rdd_110_4 in memory! (computed 236.8 MiB so far)
24/01/03 14:41:25 WARN MemoryStore: Not enough space to cache rdd_110_5 in memory! (computed 355.2 MiB so far)




24/01/03 14:41:47 WARN MemoryStore: Not enough space to cache rdd_110_7 in memory! (computed 99.9 MiB so far)
24/01/03 14:41:47 WARN BlockManager: Persisting block rdd_110_7 to disk instead.
24/01/03 14:41:48 WARN MemoryStore: Not enough space to cache rdd_110_6 in memory! (computed 236.8 MiB so far)
24/01/03 14:41:48 WARN BlockManager: Persisting block rdd_110_6 to disk instead.




24/01/03 14:42:03 WARN MemoryStore: Not enough space to cache rdd_110_6 in memory! (computed 99.9 MiB so far)


[Stage 32:>                                                         (0 + 2) / 8]

24/01/03 14:42:11 WARN MemoryStore: Not enough space to cache rdd_110_1 in memory! (computed 66.6 MiB so far)
24/01/03 14:42:11 WARN MemoryStore: Not enough space to cache rdd_110_0 in memory! (computed 66.6 MiB so far)




24/01/03 14:42:23 WARN MemoryStore: Not enough space to cache rdd_110_2 in memory! (computed 99.9 MiB so far)




24/01/03 14:42:24 WARN MemoryStore: Not enough space to cache rdd_110_3 in memory! (computed 29.6 MiB so far)




24/01/03 14:42:35 WARN MemoryStore: Not enough space to cache rdd_110_4 in memory! (computed 99.9 MiB so far)




24/01/03 14:42:37 WARN MemoryStore: Not enough space to cache rdd_110_5 in memory! (computed 99.9 MiB so far)




24/01/03 14:42:48 WARN MemoryStore: Not enough space to cache rdd_110_6 in memory! (computed 99.9 MiB so far)


[Stage 34:>                                                         (0 + 2) / 8]

24/01/03 14:42:59 WARN MemoryStore: Not enough space to cache rdd_110_1 in memory! (computed 66.6 MiB so far)
24/01/03 14:42:59 WARN MemoryStore: Not enough space to cache rdd_110_0 in memory! (computed 66.6 MiB so far)




24/01/03 14:43:12 WARN MemoryStore: Not enough space to cache rdd_110_2 in memory! (computed 99.9 MiB so far)




24/01/03 14:43:13 WARN MemoryStore: Not enough space to cache rdd_110_3 in memory! (computed 29.6 MiB so far)




24/01/03 14:43:27 WARN MemoryStore: Not enough space to cache rdd_110_4 in memory! (computed 99.9 MiB so far)




24/01/03 14:43:28 WARN MemoryStore: Not enough space to cache rdd_110_5 in memory! (computed 29.6 MiB so far)




24/01/03 14:43:43 WARN MemoryStore: Not enough space to cache rdd_110_6 in memory! (computed 99.9 MiB so far)


[Stage 36:>                                                         (0 + 2) / 8]

24/01/03 14:43:56 WARN MemoryStore: Not enough space to cache rdd_110_1 in memory! (computed 66.6 MiB so far)
24/01/03 14:43:56 WARN MemoryStore: Not enough space to cache rdd_110_0 in memory! (computed 66.6 MiB so far)




24/01/03 14:44:12 WARN MemoryStore: Not enough space to cache rdd_110_2 in memory! (computed 99.9 MiB so far)
24/01/03 14:44:12 WARN MemoryStore: Not enough space to cache rdd_110_3 in memory! (computed 29.6 MiB so far)




24/01/03 14:44:27 WARN MemoryStore: Not enough space to cache rdd_110_4 in memory! (computed 99.9 MiB so far)




24/01/03 14:44:29 WARN MemoryStore: Not enough space to cache rdd_110_5 in memory! (computed 29.6 MiB so far)




24/01/03 14:44:49 WARN MemoryStore: Not enough space to cache rdd_110_6 in memory! (computed 99.9 MiB so far)


[Stage 38:>                                                         (0 + 2) / 8]

24/01/03 14:45:04 WARN MemoryStore: Not enough space to cache rdd_110_0 in memory! (computed 66.6 MiB so far)
24/01/03 14:45:04 WARN MemoryStore: Not enough space to cache rdd_110_1 in memory! (computed 66.6 MiB so far)




24/01/03 14:45:22 WARN MemoryStore: Not enough space to cache rdd_110_3 in memory! (computed 29.6 MiB so far)
24/01/03 14:45:22 WARN MemoryStore: Not enough space to cache rdd_110_2 in memory! (computed 99.9 MiB so far)




24/01/03 14:45:38 WARN MemoryStore: Not enough space to cache rdd_110_4 in memory! (computed 99.9 MiB so far)




24/01/03 14:45:39 WARN MemoryStore: Not enough space to cache rdd_110_5 in memory! (computed 29.6 MiB so far)




24/01/03 14:45:55 WARN MemoryStore: Not enough space to cache rdd_110_6 in memory! (computed 99.9 MiB so far)


                                                                                

In [54]:
mlp = mlp.fit(train_data)

[Stage 43:>                                                         (0 + 2) / 8]

24/01/03 14:58:37 WARN BlockManager: Putting block rdd_171_1 failed due to exception org.apache.spark.SparkException: Failed to execute user defined function (OneHotEncoderModel$$Lambda$4273/0x0000000841868840: (double, int) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
24/01/03 14:58:37 WARN BlockManager: Block rdd_171_1 could not be removed as it was not found on disk or in memory
24/01/03 14:58:37 ERROR Executor: Exception in task 1.0 in stage 43.0 (TID 203)
org.apache.spark.SparkException: Failed to execute user defined function (OneHotEncoderModel$$Lambda$4273/0x0000000841868840: (double, int) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:190)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.Ge

[Stage 43:>                                                         (0 + 1) / 8]

Py4JJavaError: An error occurred while calling o490.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 43.0 failed 1 times, most recent failure: Lost task 1.0 in stage 43.0 (TID 203) (10.0.2.15 executor driver): org.apache.spark.SparkException: Failed to execute user defined function (OneHotEncoderModel$$Lambda$4273/0x0000000841868840: (double, int) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:190)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1519)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1446)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1510)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1337)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Unseen value: 4.0. To handle unseen values, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.OneHotEncoderModel.$anonfun$encoder$1(OneHotEncoder.scala:275)
	at org.apache.spark.ml.feature.OneHotEncoderModel.$anonfun$encoder$1$adapted(OneHotEncoder.scala:260)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2603)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1178)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2798)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2787)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1274)
	at org.apache.spark.mllib.optimization.LBFGS$.runLBFGS(LBFGS.scala:195)
	at org.apache.spark.mllib.optimization.LBFGS.optimizeWithLossReturned(LBFGS.scala:154)
	at org.apache.spark.ml.ann.FeedForwardTrainer.train(Layer.scala:855)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassifier.$anonfun$train$1(MultilayerPerceptronClassifier.scala:228)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassifier.train(MultilayerPerceptronClassifier.scala:184)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassifier.train(MultilayerPerceptronClassifier.scala:93)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (OneHotEncoderModel$$Lambda$4273/0x0000000841868840: (double, int) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:190)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1519)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1446)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1510)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1337)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.SparkException: Unseen value: 4.0. To handle unseen values, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.OneHotEncoderModel.$anonfun$encoder$1(OneHotEncoder.scala:275)
	at org.apache.spark.ml.feature.OneHotEncoderModel.$anonfun$encoder$1$adapted(OneHotEncoder.scala:260)
	... 26 more


24/01/03 14:58:37 WARN BlockManager: Putting block rdd_171_0 failed due to exception org.apache.spark.TaskKilledException.
24/01/03 14:58:37 WARN BlockManager: Block rdd_171_0 could not be removed as it was not found on disk or in memory
24/01/03 14:58:37 WARN TaskSetManager: Lost task 0.0 in stage 43.0 (TID 202) (10.0.2.15 executor driver): TaskKilled (Stage cancelled)


In [49]:
predictions = rf.transform(test_data)

In [50]:
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")



Accuracy: 0.7646451503003953


                                                                                