In [0]:
#https://medium.com/data-science-school/practical-apache-spark-in-10-minutes-part-4-mllib-fca02fecf5b8
#https://towardsdatascience.com/uci-heart-disease-classification-with-pyspark-eadc8e99663f
#Import Data & Exploratory Data Analysis (EDA)
#Load the data
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
stars_df= sqlContext.read.load('/FileStore/tables/Stars.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
#stars_df.show(truncate=False)
stars_df.show(3)

print("There are", stars_df.count(), "rows" ,len(stars_df.columns),"columns" ,"in the data .")
#There are 240 rows 7 columns in the data 

+-----------+------+------+-----+-----+--------------+----+
|Temperature|     L|     R|  A_M|Color|Spectral_Class|Type|
+-----------+------+------+-----+-----+--------------+----+
|       3068|0.0024|  0.17|16.12|  Red|             M|   0|
|       3042|5.0E-4|0.1542| 16.6|  Red|             M|   0|
|       2600|3.0E-4| 0.102| 18.7|  Red|             M|   0|
+-----------+------+------+-----+-----+--------------+----+
only showing top 3 rows

There are 240 rows 7 columns in the data .


In [0]:
#Our classes are perfect balanced.
import pandas as pd
pd.DataFrame(stars_df.take(5), columns=stars_df.columns).transpose()

Unnamed: 0,0,1,2,3,4
Temperature,3068,3042,2600,2800,1939
L,0.0024,0.0005,0.0003,0.0002,0.000138
R,0.17,0.1542,0.102,0.16,0.103
A_M,16.12,16.6,18.7,16.65,20.06
Color,Red,Red,Red,Red,Red
Spectral_Class,M,M,M,M,M
Type,0,0,0,0,0


In [0]:
#https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa
df = stars_df.select('Temperature','L','R','A_M','Type','Color','Spectral_Class')
cols = df.columns
df.printSchema()

root
 |-- Temperature: integer (nullable = true)
 |-- L: double (nullable = true)
 |-- R: double (nullable = true)
 |-- A_M: double (nullable = true)
 |-- Type: integer (nullable = true)
 |-- Color: string (nullable = true)
 |-- Spectral_Class: string (nullable = true)



In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
categoricalColumns = ['Color', 'Spectral_Class']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'Type', outputCol = 'target')
stages += [label_stringIdx]

numericCols = ['Temperature','L','R','A_M']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [0]:
#https://docs.databricks.com/notebooks/visualizations/index.html
# Run the stages as a Pipeline. This puts the data through all of the feature transformations in a single call.
from pyspark.ml import Pipeline
pipeline = Pipeline().setStages(stages)
p_model = pipeline.fit(df)
final_df = p_model.transform(df)
final_df.printSchema()

#final_df.show(5)

root
 |-- Temperature: integer (nullable = true)
 |-- L: double (nullable = true)
 |-- R: double (nullable = true)
 |-- A_M: double (nullable = true)
 |-- Type: integer (nullable = true)
 |-- Color: string (nullable = true)
 |-- Spectral_Class: string (nullable = true)
 |-- ColorIndex: double (nullable = false)
 |-- ColorclassVec: vector (nullable = true)
 |-- Spectral_ClassIndex: double (nullable = false)
 |-- Spectral_ClassclassVec: vector (nullable = true)
 |-- target: double (nullable = false)
 |-- features: vector (nullable = true)



In [0]:
training_df=final_df.select('target','features')
training_df.show(5)
#split the Data Frame randomly into train and test sets.
#train, test = training_df.randomSplit([0.6, 0.4],seed=1234)
#train, test = training_df.randomSplit([0.7, 0.3],seed=1234)
#train, test = training_df.randomSplit([0.8, 0.2],seed=1234)
train, test = training_df.randomSplit([0.9, 0.1],seed=1234)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

+------+--------------------+
|target|            features|
+------+--------------------+
|   0.0|(26,[0,16,22,23,2...|
|   0.0|(26,[0,16,22,23,2...|
|   0.0|(26,[0,16,22,23,2...|
|   0.0|(26,[0,16,22,23,2...|
|   0.0|(26,[0,16,22,23,2...|
+------+--------------------+
only showing top 5 rows

Training Dataset Count: 214
Test Dataset Count: 26


In [0]:
#https://towardsdatascience.com/uci-heart-disease-classification-with-pyspark-eadc8e99663f
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
lr = LogisticRegression(labelCol ='target', featuresCol ='features',maxIter=10 ,threshold=0.5)
model=lr.fit(train)
predict_train = model.transform(train)
predict_test  = model.transform(test)

predict_test.show(5)
predict_test.printSchema()
print(model)

predict_test.select("target", "rawPrediction", "prediction", "probability").show(5,truncate=False)
evaluator = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction", metricName='areaUnderROC')
### Use ROC 
e_roc = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction")
roc = e_roc.evaluate(predict_test)

#print("ROC of model at predicting label was: {:.4f}".format(roc))
print("Test Area Under ROC for train set: " + str(evaluator.evaluate(predict_train, {evaluator.metricName: "areaUnderROC"})))
print("Test Area Under ROC for test set: " + str(evaluator.evaluate(predict_test, {evaluator.metricName: "areaUnderROC"})))

#train, test[0.6, 0.4] ROC of model at predicting label was: 0.9767
#Test Area Under ROC for train set: 1.0
#Test Area Under ROC for test set: 0.9767441860465116

#train, test[0.7, 0.3] Test Area Under ROC: 1.0
#train, test[0.8, 0.2] Test Area Under ROC  : 1.0 
#train, test [0.9, 0.1] Test Area Under ROC: 1.0

+------+--------------------+--------------------+--------------------+----------+
|target|            features|       rawPrediction|         probability|prediction|
+------+--------------------+--------------------+--------------------+----------+
|   0.0|(26,[0,16,22,23,2...|[13.2249930439033...|[0.82995555296062...|       0.0|
|   0.0|(26,[0,16,22,23,2...|[13.4870106209072...|[0.86904394100456...|       0.0|
|   1.0|(26,[0,16,22,23,2...|[11.5697128334377...|[0.42333407406151...|       1.0|
|   1.0|(26,[0,16,22,23,2...|[9.03337827368953...|[0.03811084430665...|       1.0|
|   1.0|(26,[0,16,22,23,2...|[11.0628054063353...|[0.29363613379428...|       1.0|
+------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows

root
 |-- target: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

LogisticRegr

In [0]:
display(model, training_df)

fitted values,residuals
-1.0975366274292164,-0.2502017407140071
-1.585356301018644,-0.1700382362588015
-3.76813436193741,-0.0225737666081755
-1.6715536803426616,-0.1582171428398002
-2.4572855503111257,-0.0789074007789438
-2.5003296369255783,-0.0758350744783099
4.438281393236198,0.0116782358264079
3.273098764065551,0.0365056787224397
3.2105477445716795,0.0387707162053726
3.929222696662361,0.019279924555004


In [0]:
# LogisticRegression Evaluation
#https://stackoverflow.com/questions/64090386/get-all-evaluation-metrics-after-classification-in-pyspark
##https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/python/pyspark/mllib/evaluation.py#L255
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
e_accuracy = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
e_precision = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="precisionByLabel")
e_recall = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="recallByLabel")
e_f1 = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="f1")
e_falsePositiveRate = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="falsePositiveRateByLabel")

accuracy = e_accuracy.evaluate(predict_test)
precision = e_precision.evaluate(predict_test)
recall = e_recall.evaluate(predict_test)
f1score = e_f1.evaluate(predict_test)
falsePositiveRate = e_falsePositiveRate.evaluate(predict_test)

print("Accuracy of model at predicting label was: {:.4f}".format(accuracy))
print("Precision of model at predicting label was: {:.4f}".format(precision))
print("TPRate(Recall) of model at predicting label was: {:.4f}".format(recall))#
print("F1 score of model at predicting label was: {:.4f}".format(f1score))
print("FPRate of model at predicting label was: {:.4f}".format(falsePositiveRate))
print("TNRate(Specificity) of model at predicting label was: {:.4f}".format(1-falsePositiveRate))#
##train, test[0.6, 0.4] 
#Accuracy of model at predicting label was: 0.9519
#Precision of model at predicting label was: 0.8182
#TPRate(Recall) of model at predicting label was: 1.0000
#F1 score of model at predicting label was: 0.9517
#FPRate of model at predicting label was: 0.0465
#TNRate(Specificity) of model at predicting label was: 0.9535

#train, test[0.7, 0.3]
#Accuracy of model at predicting label was: 0.9512
#Precision of model at predicting label was: 0.8462
#TPRate(Recall) of model at predicting label was: 0.9167
#F1 score of model at predicting label was: 0.9513
#FPRate of model at predicting label was: 0.0286
#TNRate(Specificity) of model at predicting label was: 0.9714

#train, test[0.8, 0.2]
#Accuracy of model at predicting label was: 0.9831
#Precision of model at predicting label was: 1.0000
#TPRate(Recall) of model at predicting label was: 1.0000
#F1 score of model at predicting label was: 0.9826
#FPRate of model at predicting label was: 0.0000
#TNRate(Specificity) of model at predicting label was: 1.0000

#train, test [0.9, 0.1]
#Accuracy of model at predicting label was: 1.0000
#Precision of model at predicting label was: 1.0000
#TPRate(Recall) of model at predicting label was: 1.0000
#F1 score of model at predicting label was: 1.0000
#FPRate of model at predicting label was: 0.0000
#TNRate(Specificity) of model at predicting label was: 1.0000


Accuracy of model at predicting label was: 1.0000
Precision of model at predicting label was: 1.0000
TPRate(Recall) of model at predicting label was: 1.0000
F1 score of model at predicting label was: 1.0000
FPRate of model at predicting label was: 0.0000
TNRate(Specificity) of model at predicting label was: 1.0000


In [0]:
# LogisticRegression Evaluation
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predict_test)
print("Accuracy of model at predicting label on testing data was: {:.4f}".format(accuracy))
print("Test Error of model at predicting label on testing data was: {:.4f}".format(1-accuracy))
predict = predict_test.select(['prediction','target'])
metrics = MulticlassMetrics(predict.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())
#print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))
#According to the confusion matrix, for train, test = [0.6, 0.4]:  99 (18+17+15+16+18+15) items are correctly classified out of 104 test data. 5 items are incorrectly classified/Accuracy of model at predicting label on testing data was: 0.9519/Test Error of model at predicting label on testing data was: 0.0481

#According to the confusion matrix, for train, test = [0.7, 0.3]:  82 (12+14+14+15+14+13) items are correctly classified out of 82 test data. 0 items are incorrectly classified/Accuracy of model at predicting label on testing data was: 1.0000/Test Error of model at predicting label on testing data was: 0.0000

#According to the confusion matrix, for train, test = [0.8, 0.2]:  58 (8+11+12+10+12+5) items are correctly classified out of 59 test data. 1 items are incorrectly classified/Accuracy of model at predicting label on testing data was: 0.9831/Test Error of model at predicting label on testing data was: 0.0169

#According to the confusion matrix, for train, test = [0.9, 0.1]:  26 (2+4+6+4+6+2) items are correctly classified out of 59 test data. 0 items are incorrectly classified/Accuracy of model at predicting label on testing data was: 1.0000/Test Error of model at predicting label on testing data was: 0.0000


Accuracy of model at predicting label on testing data was: 1.0000
Test Error of model at predicting label on testing data was: 0.0000
[[2. 0. 0. 0. 0. 0.]
 [0. 4. 0. 0. 0. 0.]
 [0. 0. 6. 0. 0. 0.]
 [0. 0. 0. 4. 0. 0.]
 [0. 0. 0. 0. 8. 0.]
 [0. 0. 0. 0. 0. 2.]]
Test Dataset Count: 26


In [0]:
# LogisticRegression Evaluation
#https://stackoverflow.com/questions/60772315/how-to-evaluate-a-classifier-with-pyspark-2-4-5
# Create both evaluators
e_multi = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction")
e_bin = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction", metricName='areaUnderROC')

# Make predicitons
prediction = predict_test.select("target", "prediction")

# Get metrics
accuracy = e_multi.evaluate(prediction, {e_multi.metricName: "accuracy"})
f1 = e_multi.evaluate(prediction, {e_multi.metricName: "f1"})
weightedPrecision = e_multi.evaluate(prediction, {e_multi.metricName: "weightedPrecision"})
weightedRecall = e_multi.evaluate(prediction, {e_multi.metricName: "weightedRecall"})
falsePositiveRate = e_multi.evaluate(prediction, {e_multi.metricName: "weightedFalsePositiveRate"})
auc = e_bin.evaluate(prediction)

print("Accuracy of model at predicting label was: {:.4f}".format(accuracy))
print("F1 of model at predicting label was: {:.4f}".format(f1))
print("Precision_weighted of model at predicting label was: {:.4f}".format(weightedPrecision))
print("Recall_weighted of model at predicting label was: {:.4f}".format(weightedRecall))
print("FPRate of model at predicting label was: {:.4f}".format(falsePositiveRate))
print("TNRate(Specificity) of model at predicting label was: {:.4f}".format(1-falsePositiveRate))
print("AUC of model at predicting label was: {:.4f}".format(auc))
##train, test[0.6, 0.4]
#Accuracy of model at predicting label was: 0.9519
#F1 of model at predicting label was: 0.9517
#Precision_weighted of model at predicting label was: 0.9594
#Recall_weighted of model at predicting label was: 0.9519
#FPRate of model at predicting label was: 0.0101
#TNRate(Specificity) of model at predicting label was: 0.9899
#AUC of model at predicting label was: 0.9767

#train, test[0.7, 0.3]
#Accuracy of model at predicting label was: 0.9512
#F1 of model at predicting label was: 0.9513
#Precision_weighted of model at predicting label was: 0.9530
#Recall_weighted of model at predicting label was: 0.9512
#FPRate of model at predicting label was: 0.0092
#TNRate(Specificity) of model at predicting label was: 0.9908
#AUC of model at predicting label was: 0.9774

#train, test[0.8, 0.2]
#Accuracy of model at predicting label was: 0.9831
#F1 of model at predicting label was: 0.9826
#Precision_weighted of model at predicting label was: 0.9844
#Recall_weighted of model at predicting label was: 0.9831
#FPRate of model at predicting label was: 0.0043
#TNRate(Specificity) of model at predicting label was: 0.9957
#AUC of model at predicting label was: 1.0000

#train, test[0.9, 0.1]
#Accuracy of model at predicting label was: 1.0000
#F1 of model at predicting label was: 1.0000
#Precision_weighted of model at predicting label was: 1.0000
#Recall_weighted of model at predicting label was: 1.0000
#FPRate of model at predicting label was: 0.0000
#TNRate(Specificity) of model at predicting label was: 1.0000
#AUC of model at predicting label was: 1.0000

Accuracy of model at predicting label was: 1.0000
F1 of model at predicting label was: 1.0000
Precision_weighted of model at predicting label was: 1.0000
Recall_weighted of model at predicting label was: 1.0000
FPRate of model at predicting label was: 0.0000
TNRate(Specificity) of model at predicting label was: 1.0000
AUC of model at predicting label was: 1.0000


In [0]:
#https://www.guru99.com/pyspark-tutorial.html
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(model.coefficientMatrix ))
print("Intercept: " + str(model.interceptVector))

Coefficients: DenseMatrix([[ 1.10034008e-02, -1.73302204e+00, -1.06785971e+00,
              -2.22517748e+00, -2.58368526e+00, -3.35529687e+00,
              -3.09785032e+00, -2.93010252e+00, -2.91034724e+00,
               9.41333750e-01, -2.16403741e+00, -2.71569003e+00,
              -1.78301731e+00, -2.43488676e+00, -3.27035690e+00,
              -3.28361606e+00,  7.99195860e-02, -2.61254393e+00,
              -3.95802953e-01, -2.72499925e+00, -3.04758072e+00,
              -3.07511790e+00, -1.74442643e-04, -7.56308422e-06,
              -2.44006111e-03,  8.75033642e-01],
             [ 6.53434068e+00, -1.77613332e+00, -1.33465732e+00,
              -1.79606828e+00, -2.45131734e+00, -2.63639335e+00,
              -2.42551387e+00, -2.30379518e+00, -2.30124800e+00,
               7.58302889e-01, -2.19531458e+00, -2.42491074e+00,
              -1.85267476e+00, -2.25249371e+00, -2.53634026e+00,
              -2.54592692e+00,  6.62957891e+00, -2.26653801e+00,
              -8.63229607e-

In [0]:
#Decision Tree Classifier
from pyspark.ml.classification import DecisionTreeClassifier
#DecisionTreeClassifier model
dtc = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'target', maxDepth = 3).fit(train)
predict_test = dtc.transform(test)
#predict_test.select('target', 'rawPrediction', 'prediction', 'probability').show(5)
#print DecisionTreeClassifier model
print(dtc)

#Evaluate our Decision Tree model.
#https://www.datatechnotes.com/2021/06/pyspark-decision-tree-classification.html
# After training the model, we'll predict test data and check the accuracy metrics. Here, we can use MulticlassClassificationEvaluator to check the accuracy. Confusion matrix can be created by using confusion_matrix function of sklearn.metrics module.
evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predict_test)
print("Accuracy of model at predicting label on testing data was: {}".format(accuracy))
print("Test Error of model at predicting label on testing data was: {}".format(1-accuracy))

from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

predict = predict_test.select(['prediction','target'])
metrics = MulticlassMetrics(predict.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())
#print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))
#According to the confusion matrix, for train, test = [0.6, 0.4]:  65 (1+15+15+18+16) items are correctly classified out of 104 test data. 39 item are incorrectly classified/Accuracy of model at predicting label on testing data was: 0.625/Test Error of model at predicting label on testing data was: 0.375

#According to the confusion matrix, for train, test = [0.7, 0.3]:  81 (12+14+14+14+14+13) items are correctly classified out of 82 test data. 1 item are incorrectly classified/Accuracy of model at predicting label on testing data was: 0.9878048780487805/Test Error of model at predicting label on testing data was: 0.012195121951219523

#According to the confusion matrix, for train, test = [0.8, 0.2]:  34 (8+10+10+6) items are correctly classified out of 59 test data. 25 items are incorrectly classified/Accuracy of model at predicting label on testing data was: 0.576271186440678/Test Error of model at predicting label on testing data was: 0.423728813559322

#According to the confusion matrix, for train, test = [0.9, 0.1]:  14 (2+6+4+2) items are correctly classified out of 26 test data. 12 items are incorrectly classified/Accuracy of model at predicting label on testing data was:  0.5384615384615384/Test Error of model at predicting label on testing data was: 0.46153846153846156

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_60070a465288, depth=3, numNodes=11, numClasses=6, numFeatures=26
Accuracy of model at predicting label on testing data was: 0.5384615384615384
Test Error of model at predicting label on testing data was: 0.46153846153846156
[[2. 0. 0. 0. 0. 0.]
 [0. 0. 0. 4. 0. 0.]
 [0. 0. 6. 0. 0. 0.]
 [0. 0. 0. 4. 0. 0.]
 [0. 0. 0. 7. 0. 1.]
 [0. 0. 0. 0. 0. 2.]]
Test Dataset Count: 26


In [0]:
#Decision Tree Classifier Evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
e_accuracy = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
e_precision = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="precisionByLabel")
e_recall = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="recallByLabel")
e_f1 = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="f1")
e_falsePositiveRate = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="falsePositiveRateByLabel")
e_roc = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction")


roc = e_roc.evaluate(predict_test)
accuracy = e_accuracy.evaluate(predict_test)
precision = e_precision.evaluate(predict_test)
recall = e_recall.evaluate(predict_test)
f1score = e_f1.evaluate(predict_test)
falsePositiveRate = e_falsePositiveRate.evaluate(predict_test)

print("Accuracy of model at predicting label was: {:.4f}".format(accuracy))
print("Precision of model at predicting label was: {:.4f}".format(precision))
print("TPRate(Recall) of model at predicting label was: {:.4f}".format(recall))#
print("F1 score of model at predicting label was: {:.4f}".format(f1score))
print("FPRate of model at predicting label was: {:.4f}".format(falsePositiveRate))
print("TNRate(Specificity) of model at predicting label was: {:.4f}".format(1-falsePositiveRate))#
#print("ROC of model at predicting label was: {:.4f}".format(roc))
print("Test Area Under ROC for train set: " + str(e_roc.evaluate(predict_train, {e_roc.metricName: "areaUnderROC"})))
print("Test Area Under ROC for test set: " + str(e_roc.evaluate(predict_test, {e_roc.metricName: "areaUnderROC"})))

Accuracy of model at predicting label was: 0.5385
Precision of model at predicting label was: 1.0000
TPRate(Recall) of model at predicting label was: 1.0000
F1 score of model at predicting label was: 0.4340
FPRate of model at predicting label was: 0.0000
TNRate(Specificity) of model at predicting label was: 1.0000
Test Area Under ROC for train set: 1.0
Test Area Under ROC for test set: 1.0


In [0]:
#It comes under supervised learning and mainly used for classification but can be used for regression as well. Random forest classifier is useful because,No overfitting, High accuracy,Estimates missing data

from pyspark.ml.classification import RandomForestClassifier
#RandomForest model
rfc = RandomForestClassifier(labelCol="target",featuresCol="features", numTrees=10).fit(train)
predict_test = rfc.transform(test)
predict_train = rfc.transform(train)
#predict_test.select('target', 'rawPrediction', 'prediction', 'probability').show(5)
#print RandomForest model
print(rfc)
#https://towardsdatascience.com/a-guide-to-exploit-random-forest-classifier-in-pyspark-46d6999cb5db
#We can clearly compare the actual values and predicted values with the output below.
predict_test.select("target", "prediction").show(5)
evaluator =MulticlassClassificationEvaluator(labelCol="target",predictionCol="prediction", metricName="accuracy") 
accuracy = evaluator.evaluate(predict_test)
print("Accuracy of model at predicting label on testing data was: {}".format(accuracy))
print("Test Error of model at predicting label on testing data was: {}".format(1-accuracy))

#Now we can see that the accuracy of our model is high and the test error is very low. It means our classifier model is performing well.

#https://towardsdatascience.com/a-guide-to-exploit-random-forest-classifier-in-pyspark-46d6999cb5db
#We can use a confusion matrix to compare the predicted star types and the actual star types.
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

predict = predict_test.select(['prediction','target'])
metrics = MulticlassMetrics(predict.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())
#print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))
#According to the confusion matrix, for train, test = [0.6, 0.4]:  104 (18+17+15+20+18+16) items are correctly classified out of 104 test data. 0 item are incorrectly classified/Accuracy of model at predicting label on testing data was: 1.0

#According to the confusion matrix, for train, test = [0.7, 0.3]:  78 (11+12+14+14+14+13) items are correctly classified out of 82 test data. 4 item are incorrectly classified/Accuracy of model at predicting label on testing data was: 0.9512195121951219/Test Error of model at predicting label on testing data was: 0.04878048780487809

#According to the confusion matrix, for train, test = [0.8, 0.2] :56 (7+11+12+8+12+6) items are correctly classified out of 59 test data. 3 items are incorrectly classified/Accuracy of model at predicting label on testing data was: 0.9491525423728814/Test Error of model at predicting label on testing data was: 0.05084745762711862

#According to the confusion matrix, for train, test = [0.9, 0.1] :26 (2+4+6+4+8+2) items are correctly classified out of 26 test data. 0 item are incorrectly classified/Accuracy of model at predicting label on testing data was: 1.0000/Test Error of model at predicting label on testing data was: 0.0000

RandomForestClassificationModel: uid=RandomForestClassifier_a5fcf55d652b, numTrees=10, numClasses=6, numFeatures=26
+------+----------+
|target|prediction|
+------+----------+
|   0.0|       0.0|
|   0.0|       0.0|
|   1.0|       1.0|
|   1.0|       1.0|
|   1.0|       1.0|
+------+----------+
only showing top 5 rows

Accuracy of model at predicting label on testing data was: 1.0
Test Error of model at predicting label on testing data was: 0.0
[[2. 0. 0. 0. 0. 0.]
 [0. 4. 0. 0. 0. 0.]
 [0. 0. 6. 0. 0. 0.]
 [0. 0. 0. 4. 0. 0.]
 [0. 0. 0. 0. 8. 0.]
 [0. 0. 0. 0. 0. 2.]]
Test Dataset Count: 26


In [0]:
#RandomForestClassifier Evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
e_accuracy = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
e_precision = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="precisionByLabel")
e_recall = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="recallByLabel")
e_f1 = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="f1")
e_falsePositiveRate = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="falsePositiveRateByLabel")
e_roc = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction")

roc = e_roc.evaluate(predict_test)
accuracy = e_accuracy.evaluate(predict_test)
precision = e_precision.evaluate(predict_test)
recall = e_recall.evaluate(predict_test)
f1score = e_f1.evaluate(predict_test)
falsePositiveRate = e_falsePositiveRate.evaluate(predict_test)

print("Accuracy of model at predicting label was: {:.4f}".format(accuracy))
print("Precision of model at predicting label was: {:.4f}".format(precision))
print("TPRate(Recall) of model at predicting label was: {:.4f}".format(recall))#
print("F1 score of model at predicting label was: {:.4f}".format(f1score))
print("FPRate of model at predicting label was: {:.4f}".format(falsePositiveRate))
print("TNRate(Specificity) of model at predicting label was: {:.4f}".format(1-falsePositiveRate))#
#print("ROC of model at predicting label was: {:.4f}".format(roc))
print("Test Area Under ROC for train set: " + str(e_roc.evaluate(predict_train, {e_roc.metricName: "areaUnderROC"})))
print("Test Area Under ROC for test set: " + str(e_roc.evaluate(predict_test, {e_roc.metricName: "areaUnderROC"})))

Accuracy of model at predicting label was: 1.0000
Precision of model at predicting label was: 1.0000
TPRate(Recall) of model at predicting label was: 1.0000
F1 score of model at predicting label was: 1.0000
FPRate of model at predicting label was: 0.0000
TNRate(Specificity) of model at predicting label was: 1.0000
Test Area Under ROC for train set: 0.9971590909090908
Test Area Under ROC for test set: 1.0


In [0]:
#APPENDIX  LogisticRegression
#https://stackoverflow.com/questions/64090386/get-all-evaluation-metrics-after-classification-in-pyspark
##https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/python/pyspark/mllib/evaluation.py#L255
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
e_accuracy = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
e_precision = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="precisionByLabel")
e_recall = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="recallByLabel")
e_f1 = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="f1")
e_falsePositiveRate = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="falsePositiveRateByLabel")

accuracy = e_accuracy.evaluate(predict_test)
precision = e_precision.evaluate(predict_test)
recall = e_recall.evaluate(predict_test)
f1score = e_f1.evaluate(predict_test)
falsePositiveRate = e_falsePositiveRate.evaluate(predict_test)

print("Accuracy of model at predicting label was: {:.4f}".format(accuracy))
print("Precision of model at predicting label was: {:.4f}".format(precision))
print("TPRate(Recall) of model at predicting label was: {:.4f}".format(recall))
print("F1 score of model at predicting label was: {:.4f}".format(f1score))
print("FPRate of model at predicting label was: {:.4f}".format(falsePositiveRate))
print("TNRate(Specificity) of model at predicting label was: {:.4f}".format(1-falsePositiveRate))

Accuracy of model at predicting label was: 1.0000
Precision of model at predicting label was: 1.0000
TPRate(Recall) of model at predicting label was: 1.0000
F1 score of model at predicting label was: 1.0000
FPRate of model at predicting label was: 0.0000
TNRate(Specificity) of model at predicting label was: 1.0000


In [0]:
#APPENDIX
#https://towardsdatascience.com/evaluating-machine-learning-classification-problems-in-python-5-1-metrics-that-matter-792c6faddf5
#https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa
#https://towardsdatascience.com/binary-classifier-evaluation-made-easy-with-handyspark-3b1e69c12b4f
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
e_auc = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction")
e_auroc=BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction")
e_auprc=BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction")

auc = e_auc.evaluate(predict_test)
auroc=e_auroc.evaluate(predict_test, {evaluator.metricName: "areaUnderROC"})
auprc=e_auprc.evaluate(predict_test, {evaluator.metricName: "areaUnderPR"})

print("AUC of model at predicting label was: {:.4f}".format(auc))   
print("Area under ROC Curve: {:.4f}".format(auroc))
print("Area under PR Curve: {:.4f}".format(auprc))
#train, test[0.6, 0.4]
#AUC of model at predicting label was: 0.9767
#Area under ROC Curve: 0.9767
#Area under PR Curve: 0.9767

#train, test[0.7, 0.3]
#AUC of model at predicting label was: 0.9774
#Area under ROC Curve: 0.9774
#Area under PR Curve: 0.9774
#train, test[0.8, 0.2]
#AUC of model at predicting label was: 1.0000
#Area under ROC Curve: 1.0000
#Area under PR Curve: 1.0000

#train, test[0.9, 0.1]
#AUC of model at predicting label was: 1.0000
#Area under ROC Curve: 1.0000
#Area under PR Curve: 1.0000

AUC of model at predicting label was: 1.0000
Area under ROC Curve: 1.0000
Area under PR Curve: 1.0000


In [0]:
#APPENDIX LogisticRegression
#Using RDD based API
#https://stackoverflow.com/questions/60772315/how-to-evaluate-a-classifier-with-pyspark-2-4-5
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
# Make prediction
prediction = predict_test.select("target", "prediction")

# Create both evaluators
m_bin = BinaryClassificationMetrics(prediction.rdd.map(tuple))
m_multi = MulticlassMetrics(prediction.rdd.map(tuple))

accuracy = m_multi.accuracy
f1 = m_multi.fMeasure(1.0)
precision = m_multi.precision(1.0)
recall = m_multi.recall(1.0)
auc = m_bin.areaUnderROC
tpr=m_multi.truePositiveRate(1.0)
fpr=m_multi.falsePositiveRate(1.0)
print("Accuracy of model at predicting label was: {:.4f}".format(accuracy))
print("F1 of model at predicting label was: {:.4f}".format(f1))
print("Precision of model at predicting label was: {:.4f}".format(precision))
print("Recall of model at predicting label was: {:.4f}".format(recall))
print("AUC of model at predicting label was: {:.4f}".format(auc))
print("TruePositiveRate of model at predicting label was: {:.4f}".format(tpr))
print("FalsePositiveRate of model at predicting label was: {:.4f}".format(fpr))

Accuracy of model at predicting label was: 1.0000
F1 of model at predicting label was: 1.0000
Precision of model at predicting label was: 1.0000
Recall of model at predicting label was: 1.0000
AUC of model at predicting label was: 1.0000
TruePositiveRate of model at predicting label was: 1.0000
FalsePositiveRate of model at predicting label was: 0.0000


In [0]:
# #APPENDIX LogisticRegression
# #Using RDD based API
# #https://stackoverflow.com/questions/60772315/how-to-evaluate-a-classifier-with-pyspark-2-4-5
# from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
# # Make prediction
# prediction = predict_test.select("target", "prediction")

# # Create both evaluators
# m_bin = BinaryClassificationMetrics(prediction.rdd.map(tuple))
# m_multi = MulticlassMetrics(prediction.rdd.map(tuple))

# accuracy = m_multi.accuracy
# f1 = m_multi.fMeasure(1.0)
# precision = m_multi.precision(1.0)
# recall = m_multi.recall(1.0)
# auc = m_bin.areaUnderROC
# tpr=m_multi.truePositiveRate(1.0)
# fpr=m_multi.falsePositiveRate(1.0)
# print("Accuracy of model at predicting label was: {:.4f}".format(accuracy))
# print("F1 of model at predicting label was: {:.4f}".format(f1))
# print("Precision of model at predicting label was: {:.4f}".format(precision))
# print("Recall of model at predicting label was: {:.4f}".format(recall))
# print("AUC of model at predicting label was: {:.4f}".format(auc))
# print("TruePositiveRate of model at predicting label was: {:.4f}".format(tpr))
# print("FalsePositiveRate of model at predicting label was: {:.4f}".format(fpr))
# df = sqlContext.createDataFrame(rdd, ["prediction", "target_index"])
# df.show()

# metricsp = MulticlassMetrics(df.rdd)
# metricsp.recall(1)

# tp = df[(df.target_index == 1) & (df.prediction == 1)].count()
# tn = df[(df.target_index == 0) & (df.prediction == 0)].count()
# fp = df[(df.target_index == 0) & (df.prediction == 1)].count()
# fn = df[(df.target_index == 1) & (df.prediction == 0)].count()
# print "True Positives:", tp
# print "True Negatives:", tn
# print "False Positives:", fp
# print "False Negatives:", fn
# print "Total", df.count()

# r = float(tp)/(tp + fn)
# print "recall", r

# p = float(tp) / (tp + fp)
# print "precision", p

In [0]:
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score

# Make predicitons
prediction = predict_test.select("target", "prediction")
prediction_np = np.array((prediction.collect()))

probability=predict_test.select("target", "probability")
probability_np = np.array((probability.collect()))
#probability_np=(probability_np,dtype=object)
type(prediction_np.dtype)
type(probability_np.dtype)

  probability_np = np.array((probability.collect()))
Out[127]: numpy.dtype

In [0]:
#APPENDIX LogisticRegression
#https://stackoverflow.com/questions/60772315/how-to-evaluate-a-classifier-with-pyspark-2-4-5
#https://stackoverflow.com/questions/52269187/facing-valueerror-target-is-multiclass-but-average-binary
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score

# Make predicitons
prediction = predict_test.select("target", "prediction")
prediction_np = np.array((prediction.collect()))

#probability=predict_test.select("target", "probability")
#probability_np = np.array((probability.collect()))


accuracy=accuracy_score(prediction_np[:,0], prediction_np[:,1])
f1=f1_score(prediction_np[:,0], prediction_np[:,1],average='weighted')
precision=precision_score(prediction_np[:,0], prediction_np[:,1],average='micro')
recall=recall_score(prediction_np[:,0], prediction_np[:,1],average='micro')
#areaUnderRoc=roc_auc_score(prediction_np[:,0], prediction_np[:,1], average = 'macro', multi_class="ovr")
#areaUnderRoc=roc_auc_score(prediction_np[:,0], prediction_np[:,1],multi_class="ovr",average='weighted')
#areaUnderRoc=roc_auc_score(prediction_np[:,0], prediction_np[:,1],multi_class="ovr",average=None)
#areaUnderRoc=roc_auc_score(prediction_np[:,0], prediction_np[:,1],multi_class="ovr",average=None)
#areaUnderRoc=roc_auc_score(prediction_np[:,0], prediction_np[:,1],multi_class="ovr")
#auc = roc_auc_score(probability_np[:,0], probability_np[:,1])
print("Accuracy of model at predicting label was: {:.4f}".format(accuracy))
print("F1 of model at predicting label was: {:.4f}".format(f1))
print("Precision of model at predicting label was: {:.4f}".format(precision))
print("Recall of model at predicting label was: {:.4f}".format(recall))
#print("AreaUnderRoc of model at predicting label was: {:.4f}".format(auc))

Accuracy of model at predicting label was: 1.0000
F1 of model at predicting label was: 1.0000
Precision of model at predicting label was: 1.0000
Recall of model at predicting label was: 1.0000


In [0]:
#APPENDIX LogisticRegression
#https://www.guru99.com/pyspark-tutorial.html
#model accuracy
accuracy = predict_test.select("target", "prediction")
accuracy.groupby('target').agg({'target': 'count'}).show(3)
accuracy.groupby('prediction').agg({'prediction': 'count'}).show(3)
accuracy.filter(accuracy.target == accuracy.prediction).count() / accuracy.count()

+------+-------------+
|target|count(target)|
+------+-------------+
|   0.0|            2|
|   1.0|            4|
|   4.0|            8|
+------+-------------+
only showing top 3 rows

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|                2|
|       1.0|                4|
|       4.0|                8|
+----------+-----------------+
only showing top 3 rows

Out[129]: 1.0

In [0]:
#APPENDIX LogisticRegression
#https://www.guru99.com/pyspark-tutorial.html
select_test = predict_test.select("target", "prediction", "probability")
select_test.show(3)

+------+----------+--------------------+
|target|prediction|         probability|
+------+----------+--------------------+
|   0.0|       0.0|[0.75701127819548...|
|   0.0|       0.0|[0.93121762740183...|
|   1.0|       1.0|[0.17756463343419...|
+------+----------+--------------------+
only showing top 3 rows



In [0]:
#APPENDIX
#https://discuss.itversity.com/t/error-regarding-attributeerror-nonetype-object-has-no-attribute-select/23123/2
#https://colab.research.google.com/github/goodboychan/goodboychan.github.io/blob/main/_notebooks/2020-08-10-03-Classification-in-PySpark.ipynb#scrollTo=SrkP5B-G5iU8
from pyspark.sql.functions import lit, col
import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType, FloatType
# Create a confusion matrix
lr_selected_df=predict_test.select("target","prediction", "probability").toDF("target","prediction", "probability")
lr_conf_matrix = lr_selected_df.\
withColumn('target', lr_selected_df.target.cast(FloatType())).\
withColumn('prediction', lr_selected_df.prediction.cast(FloatType()))
lr_conf_matrix.printSchema()

lr_conf_matrix.groupBy('target', 'prediction').count().show()
# Calculate the elements of the confusion matrix
# TN = lr_conf_matrix.filter('prediction = 0 AND target = prediction').count()
# TP = lr_conf_matrix.filter('prediction = 1 AND target = prediction').count()
# FN = lr_conf_matrix.filter('prediction = 0 AND target = 1').count()
# FP = lr_conf_matrix.filter('prediction = 1 AND target = 0').count()

# # Accuracy measures the proportion of correct predictions
# accuracy = (TN + TP) / (TN + TP + FN + FP)
# print(accuracy)

root
 |-- target: float (nullable = false)
 |-- prediction: float (nullable = false)
 |-- probability: vector (nullable = true)

+------+----------+-----+
|target|prediction|count|
+------+----------+-----+
|   5.0|       5.0|    2|
|   0.0|       0.0|    2|
|   2.0|       2.0|    6|
|   4.0|       4.0|    8|
|   3.0|       3.0|    4|
|   1.0|       1.0|    4|
+------+----------+-----+



In [0]:
# #APPENDIX
# ### Use ROC 
# from pyspark.ml.evaluation import BinaryClassificationEvaluator
#  predictions = model.transform(test_data)
# # Evaluate model
# evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
# print(evaluator.evaluate(predict_test))
# print(evaluator.getMetricName())