# Spark RandomForest classification

In this task, we will build an end to end pipeline that reads in data in parquet format, converts it to CSV and loads it into a dataframe, trains a model and perform hyperparameter tuning. This notebook does the following:

*   Read in the `parquet` file.

*   Convert the `parquet` file to `CSV` format.

*   Load the CSV file into a dataframe

*   Create a 80-20 training and test split with `seed=1`.

*   Train a Random Forest model with different hyperparameters listed below and report the best performing hyperparameter combinations.

    Hyper parameters:

    ```
      - number of trees : {10, 20}
      - maximum depth : {5, 7} 
      - use random seed = 1 wherever needed


### Converts a parquet file to CSV file with header using ApacheSpark

In [1]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)


In [2]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
import os
# from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
import site
import sys
import wget
import re
import glob

In [3]:
if sys.version[0:3] == '3.9':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
elif sys.version[0:3] == '3.6':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/IBM/claimed/issues')

In [4]:
data_csv = os.environ.get('data_csv', 'data.csv') #data_csv csv path and file name
data_parquet = os.environ.get('data_parquet', 'data.parquet') # input file name (parquet)
master = os.environ.get('master', "local[*]")  # URL to Spark master
data_dir = os.environ.get('data_dir', '../../data/') # temporary directory for data
model_target = os.environ.get('model_target', "model.xml")  # model output file name
input_columns = os.environ.get('input_columns', '["x", "y", "z"]')  # input columns to consider


In [5]:
conf = SparkConf().setMaster(master)
# if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

22/01/10 00:17:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
data_parquet = 'trends.parquet'
data_csv = 'trends.csv'

In [7]:
skip = False
if os.path.exists(data_dir + data_csv):
    skip = True

In [8]:
if not skip:
    df = spark.read.parquet(data_dir + data_parquet)

In [9]:
if not skip:
    if os.path.exists(data_dir + data_csv):
        shutil.rmtree(data_dir + data_csv)
    df.coalesce(1).write.option("header", "true").csv(data_dir + data_csv)
    file = glob.glob(data_dir + data_csv + '/part-*')
    shutil.move(file[0], data_dir + data_csv + '.tmp')
    shutil.rmtree(data_dir + data_csv)
    shutil.move(data_dir + data_csv + '.tmp', data_dir + data_csv)

In [10]:
# override parameters received from a potential call using %run magic
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [11]:
df1 = spark.read.csv(data_dir + data_csv)

                                                                                

In [12]:
# df1 = df1.repartition(1)

In [13]:
 df1.show()

+---+---+---+--------------------+--------+
|_c0|_c1|_c2|                 _c3|     _c4|
+---+---+---+--------------------+--------+
|  x|  y|  z|              source|   class|
| 33| 36| 51|Accelerometer-201...|Eat_meat|
| 33| 36| 51|Accelerometer-201...|Eat_meat|
| 33| 35| 53|Accelerometer-201...|Eat_meat|
| 31| 37| 52|Accelerometer-201...|Eat_meat|
| 32| 36| 52|Accelerometer-201...|Eat_meat|
| 32| 36| 51|Accelerometer-201...|Eat_meat|
| 32| 36| 51|Accelerometer-201...|Eat_meat|
| 33| 36| 53|Accelerometer-201...|Eat_meat|
| 33| 35| 52|Accelerometer-201...|Eat_meat|
| 33| 36| 52|Accelerometer-201...|Eat_meat|
| 32| 35| 53|Accelerometer-201...|Eat_meat|
| 33| 36| 52|Accelerometer-201...|Eat_meat|
| 32| 38| 53|Accelerometer-201...|Eat_meat|
| 32| 37| 52|Accelerometer-201...|Eat_meat|
| 33| 35| 52|Accelerometer-201...|Eat_meat|
| 32| 36| 53|Accelerometer-201...|Eat_meat|
| 32| 36| 53|Accelerometer-201...|Eat_meat|
| 32| 36| 52|Accelerometer-201...|Eat_meat|
| 34| 36| 52|Accelerometer-201..

In [14]:
df1 = spark.read.option('header', 'true').csv(data_dir + data_csv)

In [15]:
 df1.show()

+---+---+---+--------------------+--------+
|  x|  y|  z|              source|   class|
+---+---+---+--------------------+--------+
| 33| 36| 51|Accelerometer-201...|Eat_meat|
| 33| 36| 51|Accelerometer-201...|Eat_meat|
| 33| 35| 53|Accelerometer-201...|Eat_meat|
| 31| 37| 52|Accelerometer-201...|Eat_meat|
| 32| 36| 52|Accelerometer-201...|Eat_meat|
| 32| 36| 51|Accelerometer-201...|Eat_meat|
| 32| 36| 51|Accelerometer-201...|Eat_meat|
| 33| 36| 53|Accelerometer-201...|Eat_meat|
| 33| 35| 52|Accelerometer-201...|Eat_meat|
| 33| 36| 52|Accelerometer-201...|Eat_meat|
| 32| 35| 53|Accelerometer-201...|Eat_meat|
| 33| 36| 52|Accelerometer-201...|Eat_meat|
| 32| 38| 53|Accelerometer-201...|Eat_meat|
| 32| 37| 52|Accelerometer-201...|Eat_meat|
| 33| 35| 52|Accelerometer-201...|Eat_meat|
| 32| 36| 53|Accelerometer-201...|Eat_meat|
| 32| 36| 53|Accelerometer-201...|Eat_meat|
| 32| 36| 52|Accelerometer-201...|Eat_meat|
| 34| 36| 52|Accelerometer-201...|Eat_meat|
| 33| 36| 52|Accelerometer-201..

In [16]:
# register a corresponding query table
df1.createOrReplaceTempView('df1')

In [17]:
from pyspark.sql.types import DoubleType
df1 = df1.withColumn("x", df1.x.cast(DoubleType()))
df1 = df1.withColumn("y", df1.y.cast(DoubleType()))
df1 = df1.withColumn("z", df1.z.cast(DoubleType()))

In [18]:
splits = df1.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [19]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [21]:
for nT in [10, 20]:
    for maxD in [5,7]:
        rf = RandomForestClassifier(numTrees=nT, maxDepth=maxD, seed=1)
        pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])
        model = pipeline.fit(df_train)
        prediction = model.transform(df_test)
        binEval = MulticlassClassificationEvaluator().setMetricName("accuracy").setPredictionCol("prediction").setLabelCol("label")
        this_pred = binEval.evaluate(prediction)
        print("numTrees =",nT,"maxDepth =",maxD,": Accuracy =", this_pred)


                                                                                

numTrees = 10 maxDepth = 5 : Accuracy = 0.4402756422486841


                                                                                

numTrees = 10 maxDepth = 7 : Accuracy = 0.46222825781977755


                                                                                

numTrees = 20 maxDepth = 5 : Accuracy = 0.4426437413721507




numTrees = 20 maxDepth = 7 : Accuracy = 0.466414518355574


                                                                                

In [None]:
# pmmlBuilder = PMMLBuilder(sc, df_train, model)
# pmmlBuilder.buildFile(data_dir + model_target)