# 1. Import packages and datasets, define functions

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
#!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
#!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
sc = SparkContext.getOrCreate(conf)
sc.master

In [None]:
#Import smoking dataset parquet as a pyspark dataframe
df = spark.read.parquet("smoking_dataset.parquet")
df.show(5)


In [None]:
# libraries
import pandas as pd
import numpy as np
import random
import matplotlib.pyplot as plt
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC, MultilayerPerceptronClassifier
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml.stat import Correlation
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

In [None]:
class RandomGridBuilder:
  '''Grid builder for random search. Sets up grids for use in CrossValidator in Spark using values randomly sampled from user-provided distributions.
  Distributions should be provided as lambda functions, so that the numbers are generated at call time.

  Parameters:
    num_models: Integer (Python) - number of models to generate hyperparameters for
    seed: Integer (Python) - seed (optional, default is None)

  Returns:
    param_map: list of parameter maps to use in cross validation.

  Example usage:
    from pyspark.ml.classification import LogisticRegression
    lr = LogisticRegression()
    paramGrid = RandomGridBuilder(2)\
               .addDistr(lr.regParam, lambda: np.random.rand()) \
               .addDistr(lr.maxIter, lambda : np.random.randint(10))\
               .build()

    Returns similar output as Spark ML class ParamGridBuilder and can be used in its place. The above paramGrid provides random hyperparameters for 2 models.
    '''

  def __init__(self, num_models, seed=None):
    self._param_grid = {}
    self.num_models = num_models
    self.seed = seed

  def addDistr(self, param, distr_generator):
    '''Add distribution based on dictionary generated by function passed to addDistr.'''

    if 'pyspark.ml.param.Param' in str(type(param)):
      self._param_grid[param] = distr_generator
    else:
      raise TypeError('param must be an instance of Param')

    return self

  def build(self):
    param_map = []
    for n in range(self.num_models):
      if self.seed:
        # Set seeds for both numpy and random in case either is used for the random distribution
        np.random.seed(self.seed + n)
        np.random.seed(self.seed + n)
      param_dict = {}
      for param, distr in self._param_grid.items():
        param_dict[param] = distr()
      param_map.append(param_dict)

    return param_map

# 2. Pre-processing

## Deriving summary statistics of BLDS and visualising proportion of prediabetic/diabetic individuals

In [None]:
# Get summary statistics based on BLDS
df.describe(["BLDS"]).show()

# Calculate the count of patients with BLDS > 126 (diabetic)
diabetic_count = df.filter(df["BLDS"] > 126).count()

# Calculate the count of patients with BLDS > 100 but <= 126 (pre-diabetic)
pre_diabetic_count = df.filter((df["BLDS"] > 100) & (df["BLDS"] <= 126)).count()

# Calculate the proportions
diabetic_proportion = diabetic_count / df.count()
pre_diabetic_proportion = pre_diabetic_count / df.count()

print(f"Diabetic Proportion: {diabetic_proportion:.2%}")
print(f"Pre-diabetic Proportion: {pre_diabetic_proportion:.2%}")

# Create visualisation
labels = ['Diabetic (BLDS > 126)', 'Pre-diabetic (BLDS > 100 & <= 126)']
proportions = [diabetic_proportion, pre_diabetic_proportion]
fig, ax = plt.subplots()
bars = ax.bar(labels, proportions, color=['red', 'blue'])
ax.bar_label(bars, labels=[f"{v*100:.2f}%" for v in proportions], padding=8, fontsize=12)

# Set plot labels
ax.set_ylim(0, 1.0)
ax.set_ylabel('Proportion of Patients in %')
ax.set_title('Proportion of Patients by Blood Sugar Levels')
ax.grid(axis='y')

plt.show()

## Creating new target variable, diabetic, based on threshold bloodsugar of 100

In [None]:
#Adding a column with boolean diabetic
df = df.withColumn("diabetic", df.BLDS >= 100)
#casting the boolean to integar if model needs ?
df = df.withColumn("diabetic_label", df.diabetic.cast("double"))
# #Removing the original columns
# df = df.drop("diabetic", "BLDS")
df.show(5)

## One hot encoding and pipeline setup for subsequent models

In [None]:
# Sex, DRK_YN need to be one hot encoded
df.printSchema()

# create StringIndexer
sex_indexer = StringIndexer(inputCol="sex", outputCol="Sex_Index")
DRK_YN_indexer = StringIndexer(inputCol="DRK_YN", outputCol="DRK_YN_Index")

# create OneHotEncoder
sex_encoder = OneHotEncoder(inputCol="Sex_Index", outputCol="Sex_Vec")
DRK_YN_encoder = OneHotEncoder(inputCol="DRK_YN_Index", outputCol="DRK_YN_Vec")

# make VectorAssembler
vec_assembler = VectorAssembler(inputCols= ["Sex_Vec", "DRK_YN_Vec", "age", "height", "weight", "waistline", "sight_left", "sight_right", "hear_left", "hear_right", "SBP", "DBP", "tot_chole", "HDL_chole", "LDL_chole", "triglyceride", "hemoglobin", "urine_protein", "serum_creatinine", "SGOT_AST", "SGOT_ALT", "gamma_GTP", "SMK_stat_type_cd"], outputCol="features")

# StandardScaler to scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# make pipeline
df_pipe = Pipeline(stages=[sex_indexer, DRK_YN_indexer, sex_encoder, DRK_YN_encoder, vec_assembler, scaler])

# fit and transform data
piped_data = df_pipe.fit(df).transform(df)


# 3. Exploratory data analysis (EDA)



## Correlation matrix plot

In [None]:
# Correlation matrix

numeric_columns = ['diabetic_label', "age", "height", "weight", "waistline", "sight_left", "sight_right", "hear_left", "hear_right", "SBP", "DBP", "tot_chole", "HDL_chole", "LDL_chole", "triglyceride", "hemoglobin", "urine_protein", "serum_creatinine", "SGOT_AST", "SGOT_ALT", "gamma_GTP", "SMK_stat_type_cd"]

# make VectorAssembler for corr plot
cor_vec_assembler = VectorAssembler(inputCols= numeric_columns , outputCol="features")

# transform data
corr_data = cor_vec_assembler.transform(df)

corr_matrix = Correlation.corr(corr_data, 'features').collect()[0][0]
corr_matrix = corr_matrix.toArray().tolist()
corr_matrix_df = pd.DataFrame(data=corr_matrix, columns = numeric_columns, index= numeric_columns)


plt.figure(figsize=(16,5))
sns.heatmap(corr_matrix_df,
            xticklabels=corr_matrix_df.columns.values,
            yticklabels=corr_matrix_df.columns.values,  cmap="inferno", annot=True,fmt='.1g')
plt.title('Correlation Matrix')


## Exploring optimal PCA component with plot 

In [None]:
#PCA

#Loop to iterate through 1-10 dimensions of pca, to find the minimum number of PCA dimensions to capture 95% of variance
explained_variance_list = []
for i in range(1,11):
  pca = PCA(k = i, inputCol = "features", outputCol= "pca_features")
  pca_model = pca.fit(piped_data)
  explained_variance = np.array(pca_model.explainedVariance)
  sum_variance = np.sum(explained_variance)
  explained_variance_list.append(sum_variance)

#Plot to illustrate the number of dimensions we should select, to account for a threshold 95% of variance

sns.lineplot(x = np.arange(1,11), y = explained_variance_list)
plt.axhline(y= 0.95, color = 'red')
plt.title("Plot of Total Explained Variance against k Principle Component Dimensions")
plt.xlabel("Principle Component Dimensions")
plt.ylabel("Total explained variance")
plt.show()

#k = 5 is selected
pca = PCA(k = 5, inputCol = "features", outputCol= "pca_features")
#Separate fit needs to be done to get explained variance, cannot extract from a pipeline which is why pca needs to be separate
pca_model = pca.fit(piped_data)
piped_data_pca = pca_model.transform(piped_data)

explained_variance = np.array(pca_model.explainedVariance)

# Create a bar plot of explained variance by principle component
sns.barplot(x=['PC1', 'PC2', 'PC3', 'PC4', 'PC5'], y=explained_variance)
plt.title('Explained Variance by Principal Component')
plt.xlabel('Principal Component')
plt.ylabel('Explained Variance Ratio')
plt.show()




In [None]:
# Pca exploration

#PCA with k = 2
pca_test = PCA(k = 2, inputCol = "features", outputCol= "pca_features")

# make pipeline
df_pipe_test = Pipeline(stages=[sex_indexer, DRK_YN_indexer, sex_encoder, DRK_YN_encoder, vec_assembler, scaler, pca_test])

# fit and transform data
piped_data_test = df_pipe_test.fit(df).transform(df)


In [None]:
# Create a scatter plot of variables mapped to 2 PC
sns.scatterplot(x= test_df1.PC1, y = test_df1.PC2, s = 1)
plt.title('Scatter plot of points in Principle component axis ')
plt.xlabel('Principal Component 1')
plt.ylabel('Principle Component 2')
plt.show()

In [None]:
test = piped_data_test.select("pca_features")
test_df = test.toPandas()
test_df_plot = pd.DataFrame(test_df["pca_features"].to_list(), columns=['PC1', 'PC2'])


In [1]:
Q1 = test_df_plot.quantile(0.1)
Q3 = test_df_plot.quantile(0.9)
IQR = Q3 - Q1

test_df1 = test_df_plot[~((test_df_plot < (Q1 - 1.5 * IQR)) |(test_df_plot > (Q3 + 1.5 * IQR))).any(axis=1)]

NameError: name 'test_df_plot' is not defined

##  Final PCA conversion 

In [None]:
#PCA with k = 5
pca = PCA(k = 5, inputCol = "features", outputCol= "pca_features")

# make pipeline
df_pipe = Pipeline(stages=[sex_indexer, DRK_YN_indexer, sex_encoder, DRK_YN_encoder, vec_assembler, scaler, pca])

# fit and transform data
piped_data = df_pipe.fit(df).transform(df)

# split data into training and testing data
training, test = piped_data.randomSplit([0.8, 0.2], seed=999) # set seed=999 for reproducibility

# 4. Models

In [None]:
# create evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="diabetic_label", predictionCol="prediction", metricName="recallByLabel", metricLabel=1)

In [None]:
# create Logistic Regression model
lr = LogisticRegression(featuresCol="scaled_features", labelCol="diabetic_label")

# create and build parameter grid
grid_lr = RandomGridBuilder(num_models=20, seed=999)\
               .addDistr(lr.regParam, lambda: np.random.uniform(0.001,1))\
               .addDistr(lr.elasticNetParam, lambda : np.random.uniform(0,1))\
               .build()

# create CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid_lr, evaluator=evaluator, numFolds=5, seed=999)

# fit cross validation models
models = cv.fit(training)

# extract best model
best_lr = models.bestModel

# predict on test data
test_results = best_lr.transform(test)

# get best model parameters
print(best_lr.getRegParam())
print(best_lr.getElasticNetParam())

# evaluate the best model
print(evaluator.evaluate(test_results))

#Output results 
lr_results = pd.DataFrame([best_lr.getRegParam(), best_lr.getElasticNetParam(), evaluator.evaluate(test_results)], index = ["alpha", "mixture", "Recall"])
lr_results.to_csv("Logistic Regression Results.csv")


In [None]:
#SVC classifier

svc = LinearSVC(labelCol = "diabetic_label", featuresCol = "scaled_features")

grid_svc = RandomGridBuilder(20, seed=999)\
               .addDistr(svc.regParam, lambda: np.random.rand()) \
                .build()

grid_svc



#Cross validation
cv_svc = CrossValidator(estimator = svc, estimatorParamMaps = grid_svc, evaluator = evaluator, numFolds = 5, seed = 999)
models_svc = cv_svc.fit(training)
best_svc = models_svc.bestModel

test_results_svc = best_svc.transform(test)

# get the best model parameters
bestModelIndex = np.argmax(models_svc.avgMetrics)
print(grid_svc[bestModelIndex][svc.regParam])

print(evaluator.evaluate(test_results_svc))

#Output results 
svc_results = pd.DataFrame([grid_svc[bestModelIndex][svc.regParam], evaluator.evaluate(test_results_svc)], index=["alpha", "recall"])
svc_results.to_csv("SVC Results.csv")

In [None]:
#Random forest classifier

rf = RandomForestClassifier(labelCol = "diabetic_label", featuresCol = "scaled_features", seed = 999)

# random parameter grid
grid_rf = RandomGridBuilder(20, seed=999)
#backslash needed for syntax
grid_rf = grid_rf.addDistr(rf.numTrees, lambda: np.random.randint(50, 500))\
                  .addDistr(rf.maxDepth, lambda:np.random.randint(5) ) \
                  .addDistr(rf.featureSubsetStrategy, lambda: str(np.random.randint(24)))
grid_rf = grid_rf.build()

#Cross validation
cv_rf = CrossValidator(estimator = rf, estimatorParamMaps = grid_rf, evaluator = evaluator, numFolds = 5, seed = 999)
models_rf = cv_rf.fit(training)
best_rf = models_rf.bestModel

test_results_rf = best_rf.transform(test)

# get the best model parameters
bestModelIndex = np.argmax(models_rf.avgMetrics)
print(grid_rf[bestModelIndex][rf.numTrees])
print(grid_rf[bestModelIndex][rf.maxDepth])
print(grid_rf[bestModelIndex][rf.featureSubsetStrategy])

print(evaluator.evaluate(test_results_rf))

#Output results 
rf_results = pd.DataFrame([grid_rf[bestModelIndex][rf.numTrees],grid_rf[bestModelIndex][rf.maxDepth], grid_rf[bestModelIndex][rf.featureSubsetStrategy],evaluator.evaluate(test_results_rf)], index=["ntree", "max_depth", "mtry" , "recall"])
rf_results.to_csv("RF Results.csv")

In [None]:
xgb = SparkXGBClassifier(
  features_col="scaled_features",
  label_col="diabetic_label",
  seed=999,
  n_estimators=100,
  num_workers=sc.defaultParallelism
)

grid = RandomGridBuilder(num_models=20, seed=999)\
               .addDistr(xgb.learning_rate, lambda: np.random.uniform(0.001,1))\
               .addDistr(xgb.reg_alpha, lambda : np.random.uniform(0,5))\
               .addDistr(xgb.reg_lambda, lambda : np.random.uniform(0,5))\
               .addDistr(xgb.gamma, lambda: np.random.uniform(0,3))\
               .addDistr(xgb.max_depth, lambda : np.random.randint(4,11))\
               .build()

# create CrossValidator
cv = CrossValidator(estimator=xgb, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, seed=999)

# fit cross validation models
models = cv.fit(training)

# extract best model
best_model = models.bestModel

# predict on test data
test_results = best_model.transform(test)

# get the best model parameters
bestModelIndex = np.argmax(models.avgMetrics)
print(grid[bestModelIndex][xgb.learning_rate])
print(grid[bestModelIndex][xgb.reg_alpha])
print(grid[bestModelIndex][xgb.reg_lambda])
print(grid[bestModelIndex][xgb.gamma])
print(grid[bestModelIndex][xgb.max_depth])

# evaluate the best model
print(evaluator.evaluate(test_results))

#Output results 
xgb_results = pd.DataFrame([grid[bestModelIndex][xgb.learning_rate],
                           grid[bestModelIndex][xgb.reg_alpha],
                           grid[bestModelIndex][xgb.reg_lambda],
                           grid[bestModelIndex][xgb.gamma],
                           grid[bestModelIndex][xgb.max_depth],
                           evaluator.evaluate(test_results)], 
                           index=["learning rate", "reg_alpha", "reg_lambda" , "gamma", "max_depth", "recall"])
xgb_results.to_csv("xgb Results.csv")

In [None]:
# MultilayerPerceptronClassifier

# specify layers for the neural network:
# input layer of size 23 (features), 1 hidden layer of size 10 and output layer of size 2 (classes)
layers = [23, 16, 8, 2]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(featuresCol="scaled_features", labelCol="diabetic_label", maxIter=100, layers=layers, seed=999)

# build grid
grid = RandomGridBuilder(num_models=20, seed=999)\
               .addDistr(trainer.stepSize, lambda: np.random.uniform(0.001,0.1))\
               .addDistr(trainer.blockSize, lambda : np.random.randint(1,501))\
               .build()

# create CrossValidator
cv = CrossValidator(estimator=trainer, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, seed=999)

# fit cross validation models
models = cv.fit(training)

# extract best model
best_model = models.bestModel

# predict on test data
test_results = best_model.transform(test)

# get best model parameters
print(best_model.getStepSize())
print(best_model.getBlockSize())

# evaluate the best model
print(evaluator.evaluate(test_results))

#Output results 
mlp_results = pd.DataFrame([best_model.getStepSize(),
                           best_model.getBlockSize(),
                           evaluator.evaluate(test_results)], 
                           index=["step size", "block size","recall"])
mlp_results.to_csv("mlp Results.csv")

# 5. PCA approach


In [None]:
# create Logistic Regression model
lr = LogisticRegression(featuresCol="pca_features", labelCol="diabetic_label")

# create and build parameter grid
grid = RandomGridBuilder(num_models=20, seed=999)\
               .addDistr(lr.regParam, lambda: np.random.uniform(0.001,1))\
               .addDistr(lr.elasticNetParam, lambda : np.random.uniform(0,1))\
               .build()

# create CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, seed=999)

# fit cross validation models
models = cv.fit(training)

# extract best model
best_lr = models.bestModel

# predict on test data
test_results = best_lr.transform(test)

# get best model parameters
print(best_lr.getRegParam())
print(best_lr.getElasticNetParam())

# evaluate the best model
print(evaluator.evaluate(test_results))

#Output results 
lr_results = pd.DataFrame([best_lr.getRegParam(), best_lr.getElasticNetParam(), evaluator.evaluate(test_results)], index = ["alpha", "mixture", "Recall"])
lr_results.to_csv("PCA Logistic Regression Results.csv")

In [None]:
#SVC classifier

svc = LinearSVC(labelCol = "diabetic_label", featuresCol = "pca_features")


grid_svc = RandomGridBuilder(20, seed = 999)\
               .addDistr(svc.regParam, lambda: np.random.rand()) \
                .build()

grid_svc



#Cross validation
cv_svc = CrossValidator(estimator = svc, estimatorParamMaps = grid_svc, evaluator = evaluator, numFolds = 5, seed = 999)
models_svc = cv_svc.fit(training)
best_svc = models_svc.bestModel

test_results_svc = best_svc.transform(test)

# get the best model parameters
bestModelIndex = np.argmax(models_svc.avgMetrics)
print(grid_svc[bestModelIndex][svc.regParam])

print(evaluator.evaluate(test_results_svc))

#Output results 
svc_results = pd.DataFrame([grid_svc[bestModelIndex][svc.regParam], evaluator.evaluate(test_results_svc)], index=["alpha", "recall"])
svc_results.to_csv("PCA SVC Results.csv")

In [None]:
#Random forest classifier

rf = RandomForestClassifier(labelCol = "diabetic_label", featuresCol = "pca_features", seed = 999)

# random parameter grid
grid_rf = RandomGridBuilder(20, seed = 999)
#backslash needed for syntax
grid_rf = grid_rf.addDistr(rf.numTrees, lambda: np.random.randint(50, 500))\
                  .addDistr(rf.maxDepth, lambda:np.random.randint(5) ) \
                  .addDistr(rf.featureSubsetStrategy, lambda: str(np.random.randint(5)))
grid_rf = grid_rf.build()

#Cross validation
cv_rf = CrossValidator(estimator = rf, estimatorParamMaps = grid_rf, evaluator = evaluator, numFolds = 5, seed = 999)
models_rf = cv_rf.fit(training)
best_rf = models_rf.bestModel

test_results_rf = best_rf.transform(test)

# get the best model parameters
bestModelIndex = np.argmax(models_rf.avgMetrics)
print(grid_rf[bestModelIndex][rf.numTrees])
print(grid_rf[bestModelIndex][rf.maxDepth])
print(grid_rf[bestModelIndex][rf.featureSubsetStrategy])

print(evaluator.evaluate(test_results_rf))

#Output results 
rf_results = pd.DataFrame([grid_rf[bestModelIndex][rf.numTrees],grid_rf[bestModelIndex][rf.maxDepth], grid_rf[bestModelIndex][rf.featureSubsetStrategy],evaluator.evaluate(test_results_rf)], index=["ntree", "max_depth", "mtry" , "recall"])
rf_results.to_csv("PCA RF Results.csv")

In [None]:
xgb = SparkXGBClassifier(
  features_col="pca_features",
  label_col="diabetic_label",
  seed=999,
  n_estimators=100,
  num_workers=sc.defaultParallelism
)

grid = RandomGridBuilder(num_models=20, seed=999)\
               .addDistr(xgb.learning_rate, lambda: np.random.uniform(0.001,1))\
               .addDistr(xgb.reg_alpha, lambda : np.random.uniform(0,5))\
               .addDistr(xgb.reg_lambda, lambda : np.random.uniform(0,5))\
               .addDistr(xgb.gamma, lambda: np.random.uniform(0,3))\
               .addDistr(xgb.max_depth, lambda : np.random.randint(4,11))\
               .build()

# create CrossValidator
cv = CrossValidator(estimator=xgb, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, seed=999)

# fit cross validation models
models = cv.fit(training)

# extract best model
best_model = models.bestModel

# predict on test data
test_results = best_model.transform(test)

# get the best model parameters
bestModelIndex = np.argmax(models.avgMetrics)
print(grid[bestModelIndex][xgb.learning_rate])
print(grid[bestModelIndex][xgb.reg_alpha])
print(grid[bestModelIndex][xgb.reg_lambda])
print(grid[bestModelIndex][xgb.gamma])
print(grid[bestModelIndex][xgb.max_depth])

# evaluate the best model
print(evaluator.evaluate(test_results))

In [None]:
# MultilayerPerceptronClassifier

# specify layers for the neural network:
# input layer of size 23 (features), 1 hidden layer of size 10 and output layer of size 2 (classes)
layers = [5, 16, 8, 2]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(featuresCol="pca_features", labelCol="diabetic_label", maxIter=100, layers=layers, seed=999)

# build grid
grid = RandomGridBuilder(num_models=20, seed=999)\
               .addDistr(trainer.stepSize, lambda: np.random.uniform(0.001,0.1))\
               .addDistr(trainer.blockSize, lambda : np.random.randint(1,501))\
               .build()

# create CrossValidator
cv = CrossValidator(estimator=trainer, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, seed=999)

# fit cross validation models
models = cv.fit(training)

# extract best model
best_model = models.bestModel

# predict on test data
test_results = best_model.transform(test)

# get best model parameters
print(best_model.getStepSize())
print(best_model.getBlockSize())

# evaluate the best model
print(evaluator.evaluate(test_results))


#Output results 
mlp_results = pd.DataFrame([best_model.getStepSize(),
                           best_model.getBlockSize(),
                           evaluator.evaluate(test_results)], 
                           index=["step size", "block size","recall"])
mlp_results.to_csv("PCA mlp Results.csv")