# RandomForest Classification

__build an end to end pipeline that reads in data in parquet format, converts it to CSV and loads it into a dataframe, trains a model and perform hyperparameter tuning. For this submission, you may use code and snippets from all the resources mentioned above including the component library. Create a notebook that does the following:__

- Read in the parquet file you created as part of Task 3.
- Convert the parquet file to CSV format.
- Load the CSV file into a dataframe
- Create a 80-20 training and test split with seed=1.
- Train a Random Forest model with different hyperparameters listed below and report the best performing hyperparameter combinations.

### Hyper parameters:
  - number of trees : {10, 20}
  - maximum depth : {5, 7} 
  - use random seed = 1 wherever needed

In [1]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)


In [2]:
# @param data_dir temporal data storage for local execution
# @param data_csv csv path and file name (default: data.csv)
# @param data_parquet path and parquet file name (default: data.parquet)
# @param master url of master (default: local mode)

In [3]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
import os
import shutil
import glob
import fnmatch
from pathlib import Path
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
import random
import re
import sys
import logging

from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import site
import wget

In [4]:
data_csv = os.environ.get('data_csv', 'data.csv')
data_parquet = os.environ.get('data_parquet', 'data.parquet')
master = os.environ.get('master', "local[*]")
data_dir = os.environ.get('data_dir', '../../data/')

In [5]:
data_parquet = 'data.parquet'
data_csv = 'data.csv'

In [6]:
sc = SparkContext.getOrCreate(SparkConf().setMaster(master))
spark = SparkSession.builder.getOrCreate()

22/10/04 10:44:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/04 10:45:00 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/10/04 10:45:00 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


### Step 1. Read in the parquet file you created as part of Task 3.

In [7]:
df = spark.read.parquet(data_dir + data_parquet)

                                                                                

In [8]:
if os.path.exists(data_dir + data_csv):
    shutil.rmtree(data_dir + data_csv)
df.coalesce(1).write.option("header", "true").csv(data_dir + data_csv)
file = glob.glob(data_dir + data_csv + '/part-*')
shutil.move(file[0], data_dir + data_csv + '.tmp')
shutil.rmtree(data_dir + data_csv)
shutil.move(data_dir + data_csv + '.tmp', data_dir + data_csv)

                                                                                

'../../data/data.csv'

In [9]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

### Step 2. Convert the parquet file to CSV format.

In [10]:
df1 = spark.read.csv(data_dir + data_csv)

In [11]:
df1 = df1.repartition(1)
df1.show()



+---+---+---+--------------------+-----+
|_c0|_c1|_c2|                 _c3|  _c4|
+---+---+---+--------------------+-----+
|  x|  y|  z|              source|class|
| 13| 45| 36|Accelerometer-201...| Walk|
| 13| 45| 35|Accelerometer-201...| Walk|
| 10| 46| 36|Accelerometer-201...| Walk|
| 11| 44| 36|Accelerometer-201...| Walk|
|  7| 45| 34|Accelerometer-201...| Walk|
|  7| 46| 36|Accelerometer-201...| Walk|
| 11| 42| 34|Accelerometer-201...| Walk|
| 10| 41| 34|Accelerometer-201...| Walk|
| 10| 41| 35|Accelerometer-201...| Walk|
|  8| 39| 33|Accelerometer-201...| Walk|
|  9| 39| 33|Accelerometer-201...| Walk|
| 11| 38| 33|Accelerometer-201...| Walk|
| 11| 38| 33|Accelerometer-201...| Walk|
| 12| 39| 33|Accelerometer-201...| Walk|
| 12| 40| 31|Accelerometer-201...| Walk|
| 14| 40| 32|Accelerometer-201...| Walk|
| 16| 38| 32|Accelerometer-201...| Walk|
| 16| 40| 32|Accelerometer-201...| Walk|
| 17| 38| 32|Accelerometer-201...| Walk|
+---+---+---+--------------------+-----+
only showing top

                                                                                

In [12]:
df1 = spark.read.option('header', 'true').csv(data_dir + data_csv)
df1.show()

+---+---+---+--------------------+-----+
|  x|  y|  z|              source|class|
+---+---+---+--------------------+-----+
| 13| 45| 36|Accelerometer-201...| Walk|
| 13| 45| 35|Accelerometer-201...| Walk|
| 10| 46| 36|Accelerometer-201...| Walk|
| 11| 44| 36|Accelerometer-201...| Walk|
|  7| 45| 34|Accelerometer-201...| Walk|
|  7| 46| 36|Accelerometer-201...| Walk|
| 11| 42| 34|Accelerometer-201...| Walk|
| 10| 41| 34|Accelerometer-201...| Walk|
| 10| 41| 35|Accelerometer-201...| Walk|
|  8| 39| 33|Accelerometer-201...| Walk|
|  9| 39| 33|Accelerometer-201...| Walk|
| 11| 38| 33|Accelerometer-201...| Walk|
| 11| 38| 33|Accelerometer-201...| Walk|
| 12| 39| 33|Accelerometer-201...| Walk|
| 12| 40| 31|Accelerometer-201...| Walk|
| 14| 40| 32|Accelerometer-201...| Walk|
| 16| 38| 32|Accelerometer-201...| Walk|
| 16| 40| 32|Accelerometer-201...| Walk|
| 17| 38| 32|Accelerometer-201...| Walk|
| 16| 38| 34|Accelerometer-201...| Walk|
+---+---+---+--------------------+-----+
only showing top

### Step 3. Load the CSV file into a dataframe

In [13]:
# register a corresponding query table
df1.createOrReplaceTempView('df1')

In [14]:
from pyspark.sql.types import DoubleType
df1 = df1.withColumn("x", df1.x.cast(DoubleType()))
df1 = df1.withColumn("y", df1.y.cast(DoubleType()))
df1 = df1.withColumn("z", df1.z.cast(DoubleType()))

### Step 4. Create a 80-20 training and test split with seed=1.

In [15]:
# 80-20 training and test split
splits = df1.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [17]:
data_parquet = os.environ.get('data_parquet',
                              'data.parquet')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          '../../data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to considerv

In [18]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

### Step 5. Train a Random Forest model with different hyperparameters listed below and report the best performing hyperparameter combinations.
### Step 6. Use the accuracy metric when evaluating the model with different hyperparameters

In [21]:
#  - number of trees : {10, 20}
#  - maximum depth : {5, 7} 
#  - use random seed = 1 wherever needed

from pyspark.ml.classification import RandomForestClassifier

for nT in [10, 20]:
    for maxD in [5,7]:
        # rf = RandomForestClassifier(numTrees = curNTrees, maxDepth = curMaxDepth, seed = 1)
        rf = RandomForestClassifier(numTrees=nT, maxDepth=maxD, seed=1)
        pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])
        model = pipeline.fit(df_train)
        prediction = model.transform(df_test)
        binEval = MulticlassClassificationEvaluator().setMetricName("accuracy").setPredictionCol("prediction").setLabelCol("label")
        this_pred = binEval.evaluate(prediction)
        print("numTrees =",nT,"maxDepth =",maxD,": Accuracy =", this_pred)

                                                                                

numTrees = 10 maxDepth = 5 : Accuracy = 0.44453395994181494


                                                                                

numTrees = 10 maxDepth = 7 : Accuracy = 0.4649658722166275


                                                                                

numTrees = 20 maxDepth = 5 : Accuracy = 0.44641378538659504




numTrees = 20 maxDepth = 7 : Accuracy = 0.46922904777889674


                                                                                