# RandomForest Classification

In [1]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)


In [2]:
from pathlib import Path
import os
import shutil
import glob
import pandas as pd
import fnmatch
import re
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
import logging
import site
import wget
from pyspark.ml import Pipeline


In [3]:
if sys.version[0:3] == '3.9':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
elif sys.version[0:3] == '3.6':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/IBM/claimed/issues')

In [4]:
sc= SparkContext()
spark = SparkSession\
        .builder\
        .appName("Random Forest Classification")\
        .getOrCreate()

23/11/01 00:36:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
data_csv = os.environ.get('data_csv', 'data.csv')
data_parquet = os.environ.get('data_parquet', 'data.parquet')
master = os.environ.get('master', "local[*]")
data_dir = os.environ.get('data_dir', '../../data/')

In [6]:
data_parquet = 'data.parquet'
data_csv = 'rf.csv'

In [7]:
skip = False
if os.path.exists(data_dir + data_csv):
    skip = True
if not skip:
    df = spark.read.parquet(data_dir + data_parquet)
if not skip:
    if os.path.exists(data_dir + data_csv):
        shutil.rmtree(data_dir + data_csv)
    df.coalesce(1).write.option("header", "true").csv(data_dir + data_csv)
    file = glob.glob(data_dir + data_csv + '/part-*')
    shutil.move(file[0], data_dir + data_csv + '.tmp')
    shutil.rmtree(data_dir + data_csv)
    shutil.move(data_dir + data_csv + '.tmp', data_dir + data_csv)

In [8]:
pd_read = pd.read_csv('rf.csv')

sdf = spark.createDataFrame(pd_read)

In [9]:
sdf = sdf.withColumn("x", sdf.x.cast(DoubleType()))
sdf = sdf.withColumn("y", sdf.y.cast(DoubleType()))
sdf = sdf.withColumn("z", sdf.z.cast(DoubleType()))

In [10]:
splits = sdf.randomSplit([0.8, 0.2], seed=1)
df_train = splits[0]
df_test = splits[1]

In [11]:
indexer = StringIndexer(inputCol="class", outputCol="label")

input_columns = ['x', 'y', 'z']

vectorAssembler = VectorAssembler(inputCols=input_columns,
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [12]:
num_trees = [10, 20]
max_depth = [5, 7]

In [None]:
best_accuracy = 0
best_hyperparameters = {}

# Loop through hyperparameters
for trees in num_trees:
    for depth in max_depth:
        # Initialize Random Forest Classifier
        rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=trees, maxDepth=depth, seed=1)
        
        pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])
        # Train the models
        model = pipeline.fit(df_train)

        # Make predictions on the test data
        predictions = model.transform(df_test)

        # Define the evaluator
        evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

        # Evaluate the model
        accuracy = evaluator.evaluate(predictions)

        # Print the hyperparameters and accuracy
        print(f"numTrees = {num_trees}, maxDepth = {max_depth} => Accuracy = {accuracy}")

        # Track the best hyperparameters
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_hyperparameters['numTrees'] = trees
            best_hyperparameters['maxDepth'] = depth

# Print the best hyperparameters
print(f"\nBest Hyperparameters: {best_hyperparameters}")

23/11/01 00:37:12 WARN scheduler.TaskSetManager: Stage 0 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:37:25 WARN scheduler.TaskSetManager: Stage 2 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:37:35 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:37:36 WARN scheduler.TaskSetManager: Stage 5 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:37:41 WARN scheduler.TaskSetManager: Stage 6 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:37:47 WARN scheduler.TaskSetManager: Stage 8 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:37:54 WARN scheduler.TaskSetManager: Stage 10 contains a task of very large size (908 KB). The maximum rec

numTrees = [10, 20], maxDepth = [5, 7] => Accuracy = 0.44843979503739334


23/11/01 00:38:09 WARN scheduler.TaskSetManager: Stage 23 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:38:12 WARN scheduler.TaskSetManager: Stage 25 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:38:15 WARN scheduler.TaskSetManager: Stage 27 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:38:15 WARN scheduler.TaskSetManager: Stage 28 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:38:17 WARN scheduler.TaskSetManager: Stage 29 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:38:20 WARN scheduler.TaskSetManager: Stage 31 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:38:25 WARN scheduler.TaskSetManager: Stage 33 contains a task of very large size (908 KB). The maxim

numTrees = [10, 20], maxDepth = [5, 7] => Accuracy = 0.46441745994371375


23/11/01 00:38:39 WARN scheduler.TaskSetManager: Stage 50 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:38:42 WARN scheduler.TaskSetManager: Stage 52 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:38:45 WARN scheduler.TaskSetManager: Stage 54 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
23/11/01 00:38:45 WARN scheduler.TaskSetManager: Stage 55 contains a task of very large size (908 KB). The maximum recommended task size is 100 KB.
[Stage 55:>                                                         (0 + 8) / 8]