In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
import os
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
!pip3 install pyspark2pmml 
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
import site
import sys
import wget
import re
import glob



In [3]:
data_csv = os.environ.get('data_csv', 'data.csv')
data_parquet = os.environ.get('data_parquet', 'data.parquet')
master = os.environ.get('master', "local[*]")
data_dir = os.environ.get('data_dir', '../../data/')

In [8]:
data_parquet = 'data.parquet'
data_csv = 'trends.csv'

In [9]:
skip = False
if os.path.exists(data_dir + data_csv):
    skip = True

In [10]:
if not skip:
    sc = SparkContext.getOrCreate(SparkConf().setMaster(master))
    spark = SparkSession.builder.getOrCreate()

In [11]:
if not skip:
    df = spark.read.parquet(data_dir + data_parquet)

                                                                                

In [12]:
if not skip:
    if os.path.exists(data_dir + data_csv):
        shutil.rmtree(data_dir + data_csv)
    df.coalesce(1).write.option("header", "true").csv(data_dir + data_csv)
    file = glob.glob(data_dir + data_csv + '/part-*')
    shutil.move(file[0], data_dir + data_csv + '.tmp')
    shutil.rmtree(data_dir + data_csv)
    shutil.move(data_dir + data_csv + '.tmp', data_dir + data_csv)

                                                                                

In [30]:
data_csv = os.environ.get('trends_csv',
                              'trends.csv')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          '../../data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to consider

In [34]:
conf = SparkConf().setMaster(master)
#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

In [32]:
df = spark.read.csv(data_dir + data_csv, header=True, inferSchema=True)
df.createOrReplaceTempView('df')


                                                                                

In [35]:
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))

In [50]:
# Set the seed value
seed_value = 1

# Create an 80-20 training and test split
df_train, df_test = df.randomSplit([0.8, 0.2], seed=seed_value)


numTrees_values = [10, 20]
maxDepth_values = [5, 7]



# Define the input columns for the VectorAssembler
input_columns = ["x", "y", "z"]

# Create the StringIndexer for the "class" column
indexer = StringIndexer(inputCol="class", outputCol="label")

# Create the VectorAssembler
vectorAssembler = VectorAssembler(inputCols=input_columns, outputCol="features")

# Create the MinMaxScaler for normalization
normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

#Loop over the hyperparameter combinations
for numTrees in numTrees_values:
    for maxDepth in maxDepth_values:
        # Create the RandomForestClassifier with the current hyperparameters
        rf = RandomForestClassifier(numTrees=numTrees, maxDepth=maxDepth, seed=seed_value, labelCol="label")

        # Create the pipeline with the vector assembler, normalizer, and random forest classifier
        pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])

        # Fit the pipeline to the training data
        model = pipeline.fit(df_train)

        # Make predictions on the test data
        predictions = model.transform(df_test)

        # Evaluate the accuracy of the predictions
        evaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol="label")
        accuracy = evaluator.evaluate(predictions)

        # Print the accuracy for the current hyperparameters
        print(f"Hyperparameters: numTrees={numTrees}, maxDepth={maxDepth}")
        print(f"Accuracy: {accuracy}")

        # Check if the current hyperparameters yield the highest accuracy so far
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_hyperparameters = (numTrees, maxDepth)

# Print the combination of hyperparameters that yielded the highest accuracy
print("\nBest Hyperparameters:")
print(f"numTrees={best_hyperparameters[0]}, maxDepth={best_hyperparameters[1]}")
print(f"Highest Accuracy: {best_accuracy}")


                                                                                

Hyperparameters: numTrees=10, maxDepth=7
Accuracy: 0.464623512687628


                                                                                

Hyperparameters: numTrees=20, maxDepth=5
Accuracy: 0.4459978284959536




Hyperparameters: numTrees=20, maxDepth=7
Accuracy: 0.46748900256327025

Best Hyperparameters:
numTrees=20, maxDepth=7
Highest Accuracy: 0.46748900256327025


                                                                                