In [None]:
import numpy as np
import pyspark.sql.functions as F
import seaborn as sns
from pyspark.mllib.stat import Statistics
import matplotlib.pyplot as plt

In [None]:
 raw_df = spark.read.csv("/mnt/team14/data/BitcoinHeistData.csv",inferSchema=True,header=True)
raw_df.cache()

In [None]:
#shape of dataframe
print(raw_df.count(),len(raw_df.columns))

In [None]:
raw_df.show(5)

Due to the number of distinct labels, we decided to cluster the ransomware labels by Location, changing labels to 4 class problem from 29 labels

In [None]:
distinct_labels = ['padua','montreal','princeton','white']
for i in range(len(distinct_labels)):
  raw_df = raw_df.withColumn("label", F.when(F.col("label").startswith(distinct_labels[i]),distinct_labels[i]).otherwise(F.col("label")))

In [None]:
#printing distinct lables we got in the end
raw_df.select("label").distinct().show()

In [None]:
#copying raw_df to df
df = raw_df
raw_df.unpersist()
df.cache()

#EDA

Checking the data imbalance for newly obtained labels

In [None]:
#Checking for counts of data
display(df.groupBy('label').count())

Checking whether we have any missing/na values in our dataset that we might have to deal with

In [None]:
#Checking for missing values
missing_value = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
missing_value.show()

No missing values hence we do not need to do data interpolation

Checking if we have any duplicated rows in our dataset

In [None]:
df.toPandas().duplicated().sum()

as the returned count is zero, it is safe to say our dataset only have distinct rows

In [None]:
df.printSchema()

By checking the schema of this dataset, we can see that except address every feature is numeric so we can exclude address from our model as it is not relevant to the case and can't be label encoded to be feeded to the model either.
We also notice that our dataset is made up of continous data.

In [None]:
df.summary().show()

We can see the variation in feature's ranges by looking at their min-max but visualising it using boxplot gives a better overview while making it easier to read it.

In [None]:
import matplotlib.pyplot as plt
df.toPandas().boxplot(rot=90, figsize=(10,10))
plt.title('Boxplot for each Features')
#plt.suptitle('')
plt.xlabel('features')
plt.show()

Looking at the box plot, it can be concluded that the dataset is ridden with lots of outliers and the range of features varies in a huge scale which might result in the ML model having trouble in learning features representing this dataset correctly so we will *"scale"* the dataset and deal with outliers

In [None]:
features = df.rdd.map(lambda row: row[1:])
corr_mat=Statistics.corr(features, method="pearson")
matrix = np.triu(corr_mat)

fig, ax = plt.subplots(figsize=(15,15)) 
sns.heatmap(corr_mat, annot=True, xticklabels=df.columns[1:-1], yticklabels=df.columns[1:-1], square=True, mask=matrix)

We can see that count is fairly correlated with length and neighbours with weight. As we can see no feature is highly correlated amongst themselves, we can positively conclude that we dont have redundant features which might be following the same trend strictly or be highly proportional.

In [None]:
sns.pairplot(df.toPandas())

In [None]:
data_mean = df.toPandas().iloc[:, :]
data_mean.plot(kind='box', subplots=True, layout=(8,4), sharex=False, sharey=False, fontsize=12, figsize=(15,20));

In [None]:
df.show()

In [None]:
df.toPandas()["label"].hist()
plt.title('histogram showing distribution of labels')
plt.xlabel('labels')
plt.show()

#Pre-Processing

##Removing Outliers

In [None]:
#Removing outliers using z-mean
means = df.select([F.mean(F.col(c)).alias(c) for c in df.columns]).collect()[0].asDict()
std_dev = df.select([F.stddev(F.col(c)).alias(c) for c in df.columns]).collect()[0].asDict()
for column in df.columns[1:-1]:
  cleaned_df = df.filter(F.abs(F.col(column) - means[column]) / std_dev[column] <= 3)
cleaned_df.cache()
df.unpersist()

In [None]:
cleaned_df.toPandas().boxplot(rot=90)
plt.title('Boxplot for each Features')
plt.suptitle('')
plt.xlabel('features')
plt.show()

In [None]:
cleaned_df.show(5)

In [None]:
display(cleaned_df.groupBy('label').count())

#Decision Tree without Oversample

In [None]:
from pyspark.ml.feature import StandardScaler,StringIndexer,VectorAssembler
from pyspark.ml import Pipeline
feature_cols = cleaned_df.columns[1:]
feature_cols.remove('label')
train,test = cleaned_df.randomSplit([0.7,0.3],seed=7)
vectorAssembler = VectorAssembler(inputCols=feature_cols,outputCol="vectorised")
standardScaler = StandardScaler(inputCol="vectorised",outputCol="features")
label_stringIdx = StringIndexer(inputCol='label',outputCol='idxLabel')
labelTransformer = label_stringIdx.fit(train)
train = labelTransformer.transform(train)
test = labelTransformer.transform(test)
train.cache()
test.cache()
labelTransformer.labels

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='idxLabel',metricName='f1')


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import PipelineModel
dt_noOversample = DecisionTreeClassifier(featuresCol = 'features',labelCol = 'idxLabel')
evaluator.setMetricName('f1')
paramGrid_dt_noOversample = ParamGridBuilder().addGrid(dt_noOversample.maxDepth,[17,20,22])\
                                 .build()
cv_dt_noOversample = CrossValidator(estimator=dt_noOversample,estimatorParamMaps=paramGrid_dt_noOversample,evaluator = evaluator,numFolds=5)
pipeline_dt_noOversample = Pipeline(stages=[vectorAssembler,standardScaler,cv_dt_noOversample])
# pipeline_dtModel_noOversample = pipeline_dt_noOversample.fit(train)
pipeline_dtModel_noOversample = PipelineModel.load("/mnt/team14/MulticlassModel-FinalDTwithoutOversample")

In [None]:
prediction = pipeline_dtModel_noOversample.transform(test)
evaluator.setMetricName('f1')
f1_score = evaluator.evaluate(prediction)
evaluator.setMetricName('accuracy')
print(f'Recall for White: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 0.0})}')
print(f'Recall for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 1.0})}')
print(f'Recall for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 2.0})}')
print(f'Recall for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 3.0})}')
print(f'Precision for White: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 0.0})}')
print(f'Precision for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 1.0})}')
print(f'Precision for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 2.0})}')
print(f'Precision for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 3.0})}')
print(f'F Measure for White: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 0.0})}')
print(f'F Measure for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 1.0})}')
print(f'F Measure for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 2.0})}')
print(f'F Measure for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 3.0})}')
accuracy = evaluator.evaluate(prediction)
print(f'Accuracy: {accuracy*100}')
print(f'F1 Score: {f1_score}')


In [None]:
savePipeline(pipeline_dtModel_noOversample,"MulticlassModel-FinalDTwithoutOversample")

##Over-sampling (SMOTE-ENN)
###Using Pandas and Sci-kit learn library

In [None]:
# https://towardsdatascience.com/building-an-ml-application-with-mllib-in-pyspark-part-1-ac13f01606e2
data_cols = cleaned_df.columns[:-1]
X = cleaned_df.toPandas().filter(items = data_cols)
Y = cleaned_df.select('label').toPandas()
cleaned_df.unpersist()
X.shape, Y.shape

In [None]:
from sklearn.model_selection import train_test_split
from imblearn.combine import SMOTEENN
from imblearn.under_sampling import EditedNearestNeighbours
overSampler = SMOTEENN(enn=EditedNearestNeighbours(sampling_strategy='all'))
x_train,x_test,y_train,y_test = train_test_split(X,Y,test_size=0.3,random_state=7,stratify=Y)

In [None]:
import pandas as pd
x_train = x_train.drop('address',axis=1)
x_train_res, y_train_res = overSampler.fit_resample(x_train,y_train)

In [None]:
y_train_res.value_counts()

In [None]:
dataframe1 = pd.DataFrame(x_train_res,columns=data_cols)
dataframe2 = pd.DataFrame(y_train_res,columns=['label'])

In [None]:
x_train = dataframe1.reset_index()
y_train = dataframe2.reset_index()
x_test = x_test.reset_index()
y_test = y_test.reset_index()

In [None]:
train = pd.concat([x_train,y_train],axis=1)
test = pd.concat([x_test,y_test],axis=1)
train = train.drop(train.columns[:1],axis=1)
test = test.drop(test.columns[:1],axis=1)

In [None]:
#Create PySpark DataFrame from Pandas
train_df=spark.createDataFrame(train) 
test_df = spark.createDataFrame(test)

In [None]:
# train_df.write.format("csv").save("/mnt/team14/data/train_data")
# test_df.write.format("csv").save("/mnt/team14/data/test_data")

#Train Test Loading

In [None]:
# Both the train and test data is being saved, making it easier to train model 
import pyspark.sql.functions as F
train_df = spark.read.csv("/mnt/team14/data/train_data",inferSchema=True)
test_df = spark.read.csv("/mnt/team14/data/test_data",inferSchema=True)
column_name = ['address','year','day','length','weight','count','looped','neighbour','income','label']
train_df = train_df.withColumnRenamed('_c0',column_name[0])\
                   .withColumnRenamed('_c1',column_name[1])\
                   .withColumnRenamed('_c2',column_name[2])\
                   .withColumnRenamed('_c3',column_name[3])\
                   .withColumnRenamed('_c4',column_name[4])\
                   .withColumnRenamed('_c5',column_name[5])\
                   .withColumnRenamed('_c6',column_name[6])\
                   .withColumnRenamed('_c7',column_name[7])\
                   .withColumnRenamed('_c8',column_name[8])\
                   .withColumnRenamed('_c9',column_name[9])
test_df = test_df.withColumnRenamed('_c0',column_name[0])\
                   .withColumnRenamed('_c1',column_name[1])\
                   .withColumnRenamed('_c2',column_name[2])\
                   .withColumnRenamed('_c3',column_name[3])\
                   .withColumnRenamed('_c4',column_name[4])\
                   .withColumnRenamed('_c5',column_name[5])\
                   .withColumnRenamed('_c6',column_name[6])\
                   .withColumnRenamed('_c7',column_name[7])\
                   .withColumnRenamed('_c8',column_name[8])\
                   .withColumnRenamed('_c9',column_name[9])

In [None]:
train_df = train_df.orderBy(F.rand())
test_df = test_df.orderBy(F.rand())
train_df.cache()
test_df.cache()

# Vector Assembler & Normalisation

In [None]:
# Vector Assembler, Scaler and create a Pipeline
from pyspark.ml.feature import StandardScaler,StringIndexer,VectorAssembler
from pyspark.ml import Pipeline
feature_cols = train_df.columns[1:]
feature_cols.remove('label')
vectorAssembler = VectorAssembler(inputCols=feature_cols,outputCol="vectorised")
standardScaler = StandardScaler(inputCol="vectorised",outputCol="features")
label_stringIdx = StringIndexer(inputCol='label',outputCol='idxLabel')
labelTransformer = label_stringIdx.fit(train_df)
train_df = labelTransformer.transform(train_df)
test_df = labelTransformer.transform(test_df)
labelTransformer.labels

# Evaluator

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='idxLabel',metricName='f1')

# Method to Save pipeline

In [None]:
def savePipeline(pipeline,name):
  basePath = "/mnt/team14/"
  pipeline.save(basePath + name)

#Bagging Method

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
def baggingMethod (train_features, weak_learner,bootstrap_size, maxIter=10):
  models = []
  for i in range(maxIter):
    bag = train_features.sample(withReplacement=True,fraction=bootstrap_size)
    weak_learner.setPredictionCol(f'prediction_{i}')
    weak_learner.setProbabilityCol(f'prob_{i}')
    weak_learner.setRawPredictionCol(f'raw_pred_{i}')
    print(f'Model {i}')
    models.append(weak_learner.fit(bag))
  return models


In [None]:
# Method to test each of the method
def test_individual_models(test_features,models):
  evaluator_test = MulticlassClassificationEvaluator(labelCol='idxLabel',metricName='f1')
  for i in range(len(models)):
    prediction = models[i].transform(test_features)
    evaluator_test.setMetricName('f1')
    evaluator_test.setPredictionCol(f'prediction_{i}')
    f1 = evaluator_test.evaluate(prediction)
    evaluator_test.setMetricName('accuracy')
    acc = evaluator_test.evaluate(prediction)
    print(f"F1 Score Model {i} = {f1}")
    print(f"Accuracy Score Model {i} = {acc*100}")

In [None]:
# Evaluate Ensemble Method
from statistics import mode
from pyspark.sql.types import DoubleType

def evaluate_ensemble(model,test_features):
  final_evaluator = MulticlassClassificationEvaluator(predictionCol='pred',labelCol='idxLabel',metricName='f1')
  mode_udf = F.udf(mode,DoubleType())
  prediction = model.transform(test_features)
  ensemble = prediction.select('address',F.array([f'prediction_{i}' for i in range(10)]).alias("prediction"),"idxLabel")
  majority_pred = ensemble.withColumn("pred",mode_udf("prediction"))
  f1score = final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "f1"})
  accuracy = final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "accuracy"})
  print(f'Recall for White: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "recallByLabel",final_evaluator.metricLabel: 3.0})}')
  print(f'Recall for Princeton: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "recallByLabel",final_evaluator.metricLabel: 0.0})}')
  print(f'Recall for Montreal: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "recallByLabel",final_evaluator.metricLabel: 2.0})}')
  print(f'Recall for Padua: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "recallByLabel",final_evaluator.metricLabel: 1.0})}')
  print(f'Precision for White: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "precisionByLabel",final_evaluator.metricLabel: 3.0})}')
  print(f'Precision for Princeton: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "precisionByLabel",final_evaluator.metricLabel: 0.0})}')
  print(f'Precision for Montreal: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "precisionByLabel",final_evaluator.metricLabel: 2.0})}')
  print(f'Precision for Padua: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "precisionByLabel",final_evaluator.metricLabel: 1.0})}')
  print(f'F Measure for White: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "fMeasureByLabel",final_evaluator.metricLabel: 3.0})}')
  print(f'F Measure for Princeton: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "fMeasureByLabel",final_evaluator.metricLabel: 0.0})}')
  print(f'F Measure for Montreal: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "fMeasureByLabel",final_evaluator.metricLabel: 2.0})}')
  print(f'F Measure for Padua: {final_evaluator.evaluate(majority_pred, {final_evaluator.metricName: "fMeasureByLabel",final_evaluator.metricLabel: 1.0})}')
  print(f'Accuracy: {accuracy * 100}')
  print(f'F1 score: {f1score}')


#Bagging with Logistic Regression

In [None]:
scalerTransformer = StandardScaler(inputCol='vectorised',outputCol='features').fit(vectorAssembler.transform(train_df))

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
Xtrain_bag = scalerTransformer.transform(vectorAssembler.transform(train_df))
lr = LogisticRegression(featuresCol='features',labelCol='idxLabel',family='multinomial')
models = baggingMethod(Xtrain_bag,lr,1.0)

In [None]:
from pyspark.ml import PipelineModel
Xtest_bag = scalerTransformer.transform(vectorAssembler.transform(test_df))
# pipeline_lrModel = PipelineModel(stages=models)
# savePipeline(pipeline_lrModel,"MulticlassModel-FinalELR")
pipeline_lrModel = PipelineModel.load("/mnt/team14/MulticlassModel-FinalELR")
evaluate_ensemble(pipeline_lrModel,Xtest_bag)

In [None]:
test_individual_models(Xtest_bag,models)

#Single Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import PipelineModel
lrs = LogisticRegression(featuresCol='features',labelCol='idxLabel',family='multinomial')
evaluator.setMetricName('f1')
paramGrid_lrs = ParamGridBuilder().addGrid(lrs.elasticNetParam,[0,0.6,1.0])\
                                 .addGrid(lrs.regParam,[0.01,0.1,1.0])\
                                 .build()
cv_lrs = CrossValidator(estimator=lrs,estimatorParamMaps=paramGrid_lrs,evaluator = evaluator,numFolds=5)
# pipeline_lrs = Pipeline(stages=[vectorAssembler,standardScaler,cv_lrs])
# pipeline_lrsModel = pipeline_lrs.fit(train_df)
pipeline_lrsModel = PipelineModel.load("/mnt/team14/MulticlassModel-FinalLRS")


In [None]:
# Evaluating Logistic Regression
prediction = pipeline_lrsModel.transform(test_df)
evaluator.setMetricName('f1')
f1_score = evaluator.evaluate(prediction)
evaluator.setMetricName('accuracy')
accuracy = evaluator.evaluate(prediction)
print(f'Recall for White: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 3.0})}')
print(f'Recall for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 0.0})}')
print(f'Recall for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 2.0})}')
print(f'Recall for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 1.0})}')
print(f'Precision for White: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 3.0})}')
print(f'Precision for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 0.0})}')
print(f'Precision for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 2.0})}')
print(f'Precision for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 1.0})}')
print(f'F Measure for White: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 3.0})}')
print(f'F Measure for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 0.0})}')
print(f'F Measure for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 2.0})}')
print(f'F Measure for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 1.0})}')
print(f'Accuracy: {accuracy*100}')
print(f'F1 Score: {f1_score}')

In [None]:
# Saving Single Logistic Regression
savePipeline(pipeline_lrsModel,"MulticlassModel-FinalLRS")

# Random Forest

In [None]:
# Implementing random forest
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import PipelineModel
rf = RandomForestClassifier(featuresCol = 'features',labelCol = 'idxLabel')
evaluator.setMetricName('f1')
paramGrid_rf = ParamGridBuilder().addGrid(rf.numTrees,[15,17,19])\
                                 .addGrid(rf.maxDepth,[12,15,17])\
                                 .build()
cv_rf = CrossValidator(estimator=rf,estimatorParamMaps=paramGrid_rf,evaluator = evaluator,numFolds=5)
# pipeline_rf = Pipeline(stages=[vectorAssembler,standardScaler,cv_rf])
#pipeline_rfModel = pipeline_rf.fit(train_df)
pipeline_rfModel = PipelineModel.load("/mnt/team14/MulticlassModel-FinalRF")

In [None]:
# Evaluating Random Forest
prediction = pipeline_rfModel.transform(test_df)
evaluator.setMetricName('f1')
f1_score = evaluator.evaluate(prediction)
evaluator.setMetricName('accuracy')
accuracy = evaluator.evaluate(prediction)
print(f'Recall for White: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 3.0})}')
print(f'Recall for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 0.0})}')
print(f'Recall for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 2.0})}')
print(f'Recall for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 1.0})}')
print(f'Precision for White: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 3.0})}')
print(f'Precision for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 0.0})}')
print(f'Precision for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 2.0})}')
print(f'Precision for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 1.0})}')
print(f'F Measure for White: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 3.0})}')
print(f'F Measure for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 0.0})}')
print(f'F Measure for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 2.0})}')
print(f'F Measure for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 1.0})}')
print(f'Accuracy: {accuracy*100}')
print(f'F1 Score: {f1_score}')

In [None]:
savePipeline(pipeline_rfModel,"MulticlassModel-FinalRF")

#Decision Tree

In [None]:
# Implementing Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import PipelineModel
dt = DecisionTreeClassifier(featuresCol = 'features',labelCol = 'idxLabel')
evaluator.setMetricName('f1')
paramGrid_dt = ParamGridBuilder().addGrid(dt.maxDepth,[30])\
                                 .addGrid(dt.maxBins,[128])\
                                 .build()
cv_dt = CrossValidator(estimator=dt,estimatorParamMaps=paramGrid_dt,evaluator = evaluator,numFolds=5)
pipeline_dt = Pipeline(stages=[vectorAssembler,standardScaler,cv_dt])
pipeline_dtModel = pipeline_dt.fit(train_df)
#pipeline_dtModel = PipelineModel.load("/mnt/team14/MulticlassModel-FinalDT")

In [None]:
# Evaluating Decision Tree
prediction = pipeline_dtModel.transform(test_df)
evaluator.setMetricName('f1')
f1_score = evaluator.evaluate(prediction)
evaluator.setMetricName('accuracy')
accuracy = evaluator.evaluate(prediction)
print(f'Recall for White: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 3.0})}')
print(f'Recall for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 0.0})}')
print(f'Recall for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 2.0})}')
print(f'Recall for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel",evaluator.metricLabel: 1.0})}')
print(f'Precision for White: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 3.0})}')
print(f'Precision for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 0.0})}')
print(f'Precision for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 2.0})}')
print(f'Precision for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel",evaluator.metricLabel: 1.0})}')
print(f'F Measure for White: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 3.0})}')
print(f'F Measure for Princeton: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 0.0})}')
print(f'F Measure for Montreal: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 2.0})}')
print(f'F Measure for Padua: {evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel",evaluator.metricLabel: 1.0})}')
print(f'Accuracy: {accuracy*100}')
print(f'F1 Score: {f1_score}')

In [None]:
# Saving Decision Tree
savePipeline(pipeline_dtModel,"MulticlassModel-FinalDT1withBin2")

In [None]:
cvModel = pipeline_dtModel.stages[-1]
print(cvModel.bestModel.getMaxDepth())
print(cvModel.bestModel.getMaxBins())