In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
import logging
import shutil
import site
import sys
import wget
import re

In [2]:
data_parquet = os.environ.get('data_parquet',
                              'data.parquet')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          '../../data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to consider

In [3]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [4]:
conf = SparkConf().setMaster(master)
#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

22/10/04 03:09:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/04 03:09:23 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
df = spark.read.parquet(data_dir + data_parquet)

                                                                                

In [6]:
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))

In [7]:
splits = df.randomSplit([0.8, 0.2], seed=1)
df_train = splits[0]
df_test = splits[1]

In [8]:
vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")
indexer = StringIndexer(inputCol="class", outputCol="label")

In [9]:
numTrees_list = [10, 20]
maxDepth_list = [5, 7]

for numTrees_value in numTrees_list:
    for maxDepth_value in maxDepth_list:
        search_rf = RandomForestClassifier(numTrees=numTrees_value, maxDepth=maxDepth_value)
        pipeline = Pipeline(stages=[indexer,vectorAssembler, search_rf])
        model = pipeline.fit(df_train)
        prediction = model.transform(df_train)
        binEval = MulticlassClassificationEvaluator().setMetricName("accuracy").setPredictionCol("prediction").setLabelCol("label")
        print(f'Model Parameters: {numTrees_value} {maxDepth_value} accuracy: {binEval.evaluate(prediction)}')
        print('---------------------')

                                                                                

Model Parameters: 10 5 accuracy: 0.4413014987566369
---------------------


                                                                                

Model Parameters: 10 7 accuracy: 0.45918464502542733
---------------------


                                                                                

Model Parameters: 20 5 accuracy: 0.4422872280843247
---------------------




Model Parameters: 20 7 accuracy: 0.4644745390595246
---------------------


                                                                                