<a href="https://colab.research.google.com/github/Nisarg-1406/Bank_Marketing_Using_Pyspark/blob/main/Bank_Term_Deposit_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Three main algorithm classifiers are tested which are Logistic regression, Decision trees and Random forest.
# Different metrics are computed after hyperparameter tunings using 5-fold cross validation to evaluate the models corresponding to these algorithms.
# Another Algorithms like Gradient-boosted tree classifier, NaiveBayes, Support vector Machine takes more amount of time for the execution as this dataset is not much complex dataset. 
# I have provided the code explanation as comments for the concept and syntax for lines of codes in this file.  

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

In [None]:
from pyspark.sql.functions import mean, col, split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler, VectorIndexer
from pyspark.ml.feature import QuantileDiscretizer, OneHotEncoder

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
# Creating the spark session - 
spark = SparkSession.builder.appName("Bank Term Deposit Project").getOrCreate()

In [None]:
# Creating the dataframe which is reading the csv file. 

# Header = True - If the csv file have a header (column names in the first row) then set header=true. This will use the first row in the csv file as the dataframe's column names. Setting header=false (default option) will result in a dataframe with default column names: _c0, _c1, _c2, etc.

# inferSchema = True - Schema: The schema refered to here are the column types. A column can be of type String, Double, Long, etc. Using inferSchema=false (default option) will give a dataframe where all columns are strings (StringType). Depending on what you want to do, strings may not work. For example, if you want to add numbers from different columns, then those columns should be of some numeric type (strings won't work). By setting inferSchema=true, Spark will automatically go through the csv file and infer the schema of each column. This requires an extra pass over the file which will result in reading a file with inferSchema set to true being slower. But in return the dataframe will most likely have a correct schema given its input.
# As an alternative to reading a csv with inferSchema you can provide the schema while reading. This have the advantage of being faster than inferring the schema while giving a dataframe with the correct column types. In addition, for csv files without a header row, column names can be given automatically. 

bank_df = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/nisargmehta2000@gmail.com/bank_new.csv",header = True, inferSchema = True)

In [None]:
bank_df.printSchema() # Printing the schema
bank_df.show(10) # to show the top 10 rows

In [None]:
# Number of customers in the database. 
clients_count = bank_df.count()

# Formatters work by putting in one or more replacement fields and placeholders defined by a pair of curly braces { } into a string and calling the str.format(). The value we wish to put into the placeholders and concatenate with the string passed as parameters into the format function. It returns a formatted string with the value passed as parameter in the placeholder position. 
print("Number of cutomers is {}".format(clients_count))

In [None]:
# Number of customers which are subscribed to the term deposit Vs who have not subscribed to the term deposits 
group_by_clients = bank_df.groupBy("deposit").count() # Grouping them by deposits nd counting it. 
group_by_clients.show()
display(group_by_clients)

deposit,count
no,5873
yes,5289


In [None]:
bank_df.describe([t[0] for t in bank_df.dtypes if t[1] == 'int']).show() # It is only showing the table having integer 
bank_df.describe().show()

In [None]:
display(bank_df.groupBy("job").count())
display(bank_df.groupBy("housing", "deposit","age").count()) # Grouping housing, deposits, age and plotting the curve. 

job,count
management,2566
retired,778
unknown,70
self-employed,405
student,360
blue-collar,1944
entrepreneur,328
admin.,1334
technician,1823
services,923


housing,deposit,age,count
yes,yes,22,4
no,yes,26,94
no,yes,67,23
no,yes,80,12
yes,yes,55,20
yes,yes,30,90
yes,yes,76,1
no,no,51,51
yes,yes,58,21
no,yes,90,2


In [None]:
display(bank_df.groupBy("housing", "deposit").count())

housing,deposit,count
no,no,2527
no,yes,3354
yes,yes,1935
yes,no,3346


In [None]:
# Data Preprocessing - 
# The following function code initially inspired from here indexes each categorical column using the StringIndexer, and then converts the indexed categories into one-hot encoded variables. The resulting output has the binary vectors appended to the end of each row. Then, the StringIndexer is used again to encode the labels to label indices. Finally, the VectorAssembler function is used to combine all the feature columns into a single vector column. This includes both the numeric columns and the one-hot encoded binary vector columns in the dataset. Index labels, adding metadata to the label column by using the StringIndexer again to encode the labels to label indices.
# Running the stages as a Pipeline is used to chain multiple Transformers and Estimators together. This puts the data through all of the feature transformations we described in a single call.

def get_dummy(df, categoricalCols, numCols, labelCol):
  
  indexers = [StringIndexer(inputCol=c, outputCol="{}_indexed".format(c)) for c in categoricalCols] # stringIndexer would assign a number to every category of that column

  # The output of the indexers is used input for the oneHotEncoder. We will one hot encode them, this means the number is converted into the form of array of 0s and 1s in this case of actual category was. Eg : A B C, so for A - Array is [1, 0, 0], for B - Array is [0, 1, 0]
  encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(),outputCol="{}_encoded".format(indexer.getOutputCol())) for indexer in indexers]
  
  indexer = StringIndexer(inputCol=labelCol, outputCol='indexedLabel')

  # VectorAssembler is the list of string of inputCols which all are combine to get the outputCol
  assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + numCols, outputCol="features")
  
  pipeline = Pipeline(stages = indexers + encoders + [assembler] + [indexer]) # Pipeline does is it set the stages for the different steps. If there is complex machine learning task then to set stages. 

  model=pipeline.fit(df) # To fit above created pipeline to the dataframe 
  data = model.transform(df) # Transform the dataframe to this model 

  # we use withColumn function where first parameter is the name of the column and second parameter is the existing col -> It is column.Column datatype. ALSO THIS CHANGES ARE NOT PERMENANT ON THE ORIGINAL DATAFRAME.  
  data = data.withColumn('label', col(labelCol))  # To add the label in the data table
  
  return data.select('features', 'indexedLabel', 'label'), StringIndexer(inputCol='label').fit(data) # To fit the indices [0,1 -> (No, Yes)] of label using stringindexer onto the data. 

In [None]:
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome'] # String type of data 
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous'] # integer type of data. 

In [None]:
(bank_df, labelIndexer) = get_dummy(bank_df, categoricalColumns, numericCols, 'deposit') # To call the above function 
bank_df.show(3)

In [None]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=10).fit(bank_df) 
featureIndexer.transform(bank_df).show(5)

In [None]:
bank_df.show(5,False)

In [None]:
# Using Random split to train and test the data 
training_data, test_data = bank_df.randomSplit([0.8, 0.2], seed=10) # Setting seed for reproducibility. Seed value is for random number generator. Seed function is used to save the state of a random function, so that it can generate same random numbers on multiple executions of the code on the same machine. 
print("Training Dataset Count: " + str(training_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

In [None]:
print("The first 5 samples of the Training Dataset:")
training_data.show(5,False)
print("The first 5 samples of the Test Dataset:")
test_data.show(5,False)

In [None]:
# Logistics Regression - Logistic regression is a popular method to predict a categorical response. In spark.ml logistic regression can be used to predict a binary outcome by using binomial logistic regression, or it can be used to predict a multiclass outcome by using multinomial logistic regression. 
# Create initial LogisticRegression model and then train it using the Training Data

lr = LogisticRegression(labelCol= "indexedLabel", featuresCol = "features")

In [None]:
# Pipeline architecture:-
# Convert indexed labels back to original labels
# Chain indexers and tree in a Pipeline
# Train modelwe transform on this unlabeled_data in which we get the prediction value for the corrosponding features. 
label_converter = IndexToString(inputCol = "prediction", outputCol= "predictionLabel", labels = labelIndexer.labels) # inputCol is binary number, OutputCol is the string  

pipeline = Pipeline(stages=[featureIndexer, lr, label_converter])
lrModel = pipeline.fit(training_data) # To fit the pipeline to the training data. 

In [None]:
# we transform on this unlabeled_data (test_data) for which we get the prediction value for the corrosponding features. LogisticRegression.transform() will only use the column given in featuresCol parameter.
predictions = lrModel.transform(test_data)
predictions.show(5)

In [None]:
predictions.select("features", "label", "probability", "predictionLabel").show()

In [None]:
# Computing the model accuracy. Create DataFrames with the label and the prediction to check the number of class in the label and the prediction
cm = predictions.select("label","predictionLabel")
 
cm.groupby('label').agg({'label': 'count'}).show()  # .agg (aggregate) is used to combine label and count and show the result. 	
cm.groupby('predictionLabel').agg({'predictionLabel': 'count'}).show() # .agg (aggregate) is used to combine predictionLabel and count and show the result. 

In [None]:
predictions.groupBy('label', 'predictionLabel').count().show()

In [None]:
# To find the accuracy - We are checking that how many rows have cm.label == cm.predictionLabel i.e 2 column value should match for the row upon the total number of rows. 
# For instance, in the test dataset, there are 1021 customers that have the intension to subscribe a deposit and 1197 no. The classifier, however, predicted 972 clients having the intension to subscribe a deposit. It is possible to compute the accuracy of the model by computing the count when the labels are correctly classified over the total number of rows.

print("The Accuracy for test set is {}".format(cm.filter(cm.label == cm.predictionLabel).count()/cm.count()))

In [None]:
# the accuracy of the model and other metrics can also be computed using the MulticlassClassificationEvaluator() function:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol = "prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
# Use of RDD principles to compute some other metrics - RDD - Resilient Distributed Datasets
# We can also generate a Confusion Matrix to better see the results of the predictions. ConfusionMatrix() works only with RDDs, so we will have to convert our DataFrame of (prediction, label) into a RDD.
# confusionMatrix() returns a DenseMatrix with the columns representing the predicted class ordered by ascending class label, and each row represents the actual class ordered by ascending class label. The diagonal from top left to bottom right represents the observations that were predicted correctly.
# Detailed explanation - https://gobiviswa.medium.com/apache-spark-rdd-internals-7c6604b54a23

from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
predictionAndLabel = predictions.select("prediction","indexedLabel").rdd # To apply the rdd. 

In [None]:
# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary = BinaryClassificationMetrics(predictionAndLabel)

In [None]:
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix() # Confusion matrix is a performance measurement for machine learning classification problem where output can be two or more classes.

# True Positive: Interpretation: You predicted positive and it’s true.
# True Negative: Interpretation: You predicted negative and it’s true.
# False Positive: (Type 1 Error): Interpretation: You predicted positive and it’s false.
# False Negative: (Type 2 Error): Interpretation: You predicted negative and it’s false.

# Recall = TP / (TP + FN) - Out of all the positive classes, how much we predicted correctly. It should be high as possible.
# Precision = TP / (TP + FP) - Out of all the positive classes we have predicted correctly, how many are actually positive.
# Accuracy = (TP + TN) / Total - Out of all the classes, how much we predicted correctly
# F-measure = (2 * recall * Precision) / (recall + Precsion) - It is difficult to compare two models with low precision and high recall or vice versa. So to make them comparable, we use F-Score. F-score helps to measure Recall and Precision at the same time. It uses Harmonic Mean in place of Arithmetic Mean by punishing the extreme values more.

precision = metricsMulti.precision(label=0) ## label = 0 defines the x direction and label = 1 defines the y direction 
recall = metricsMulti.recall(label=0) ## label = 0 defines the x direction and label = 1 defines the y direction 

# For binary classification we do not have attributes such as confusionMatrix, precision, recall, fMeasure
# confusionMatrix = metricsBinary.confusionMatrix()
# precision = metricsBinary.precision(label=1) 
# recall = metricsBinary.recall(label=1) 
# f1Score = metricsBinary.fMeasure()
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 

# Summary Stats - When label = 1
# Confusion Matrix = 
#  DenseMatrix([[977., 217.],
#              [240., 817.]])
# Precision = 0.7901353965183753
# Recall = 0.7729422894985809

# Summary Stats - When label = 0 
# Confusion Matrix = 
#  DenseMatrix([[970., 224.],
#              [205., 852.]])
# Precision = 0.825531914893617
# Recall = 0.8060548722800378

In [None]:
# For multiclass we do not have attributes such as areaUnderPR, areaUnderROC
# Area under precision-recall curve - Precision-Recall is a useful measure of success of prediction when the classes are very imbalanced. In information retrieval, precision is a measure of result relevancy, while recall is a measure of how many truly relevant results are returned. 
# The precision-recall curve shows the tradeoff between precision and recall for different threshold. A high area under the curve represents both high recall and high precision, where high precision relates to a low false positive rate, and high recall relates to a low false negative rate. High scores for both show that the classifier is returning accurate results (high precision), as well as returning a majority of all positive results (high recall).
# A system with high recall but low precision returns many results, but most of its predicted labels are incorrect when compared to the training labels. A system with high precision but low recall is just the opposite, returning very few results, but most of its predicted labels are correct when compared to the training labels. An ideal system with high precision and high recall will return many results, with all results labeled correctly.
print("Area under PR = %s" % metricsBinary.areaUnderPR) 

# Area under ROC curve 
# When we need to check or visualize the performance of the multi-class classification problem, we use the AUC (Area Under The Curve) ROC (Receiver Operating Characteristics) curve. ROC is a probability curve and AUC represents the degree or measure of separability. It tells how much the model is capable of distinguishing between classes. Higher the AUC, the better the model is at predicting 0s as 0s and 1s as 1s. 
# For more details - https://towardsdatascience.com/understanding-auc-roc-curve-68b2303cc9c5
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

In [None]:
# Compute the area under ROC metric
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
# Suppose to print the parameters
print(lr.explainParams())

In [None]:
# ParamGridBuilder to construct a grid of parameters to search over.
# Performing the grid methods in which passing the regularization parameter (regParam), elasticNetParam, maxIter
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

In [None]:
# https://towardsdatascience.com/cross-validation-and-hyperparameter-tuning-how-to-optimise-your-machine-learning-model-13f005af9d7d

pipeline = Pipeline(stages=[featureIndexer, lr, label_converter])

# A CrossValidator requires an Estimator for which we have define the pipeline, a set of Estimator ParamMaps, an Evaluator which is binaryClassification in this case, Then defining the k-folds i.e 5 in this case, then parallelism of 10 is set, 
# About K-FOLD - In K-fold Cross-Validation (CV) we still start off by separating a test/hold-out set from the remaining data in the data set to use for the final evaluation of our models. The data that is remaining, i.e. everything apart from the test set, is split into K number of folds (subsets). The Cross-Validation then iterates through the folds and at each iteration uses one of the K folds as the validation set while using all remaining folds as the training set. This process is repeated until every fold has been used as a validation set. By training and testing the model K number of times on different subsets of the same training data we get a more accurate representation of how well our model might perform on data it has not seen before. In a K-fold CV we score the model after every iteration and compute the average of all scores to get a better representation of how the model performs compared to only using one training and validation set. 
# Parallelism - Parameter evaluation can be done in parallel by setting parallelism with a value of 2 or more (a value of 1 will be serial) before running model selection with CrossValidator or TrainValidationSplit. The value of parallelism should be chosen carefully to maximize parallelism without exceeding cluster resources, and larger values may not always lead to improved performance. Generally speaking, a value up to 10 should be sufficient for most clusters.
# seed -  Seed value is for random number generator. Seed function is used to save the state of a random function, so that it can generate same random numbers on multiple executions of the code on the same machine. 

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(training_data) # fitting it to the training data. 

In [None]:
predictions = cvModel.transform(test_data)
predictions.select("features", "label", "probability", "predictionLabel").show(5)

In [None]:
# comparing the indexedLabel column and prediction column using MulticlassClassificationEvaluator by setting accuracy as the condition to check. 
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions))) # Evaluated on the cvModel 

In [None]:
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 

# For binary classification we do not have attributes such as confusionMatrix, precision, recall, fMeasure
# confusionMatrix = metricsBinary.confusionMatrix()
# precision = metricsBinary.precision(label=1) 
# recall = metricsBinary.recall(label=1) 
# f1Score = metricsBinary.fMeasure()
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
# Decision Tree - It is a Supervised learning technique that can be used for both classification and Regression problems, but mostly it is preferred for solving Classification problems. In a Decision tree, there are two nodes, which are the Decision Node and Leaf Node. Decision nodes are used to make any decision and have multiple branches, whereas Leaf nodes are the output of those decisions and do not contain any further branches. It is a graphical representation for getting all the possible solutions to a problem/decision based on given conditions. In order to build a tree, we use the CART algorithm, which stands for Classification and Regression Tree algorithm. A decision tree simply asks a question, and based on the answer (Yes/No), it further split the tree into subtrees.
# Eg : Suppose there is a candidate who has a job offer and wants to decide whether he should accept the offer or Not. So, to solve this problem, the decision tree starts with the root node (Salary attribute by ASM). The root node splits further into the next decision node (distance from the office) and one leaf node based on the corresponding labels. The next decision node further gets split into one decision node (Cab facility) and one leaf node. Finally, the decision node splits into two leaf nodes (Accepted offers and Declined offer). 
# For more details refer - https://www.javatpoint.com/machine-learning-decision-tree-classification-algorithm
# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features")

# Train model with Training Data.
dtModel = dt.fit(training_data)

In [None]:
# Make predictions on test data.
predictions = dtModel.transform(test_data)
# Evaluate the model by computing the metrics. - Comparing the indexedLabel column and prediction column using MulticlassClassificationEvaluator by setting accuracy as the condition to check. 
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd # Use of RDD principles to compute some other metrics - RDD - Resilient Distributed Datasets

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 

In [None]:
# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
# As we indicate 3 values for maxDepth and 3 values for maxBin, this grid will have 3 x 3 = 9 parameter settings for CrossValidator to choose from.
# max_depth - It is what the name suggests: The maximum depth that you allow the tree to grow to. The deeper you allow, the more complex your model will become. If you increase max_depth, training error will always go down (or at least not go up). If you set max_depth too high, then the decision tree might simply overfit the training data without capturing useful patterns as we would like; this will cause testing error to increase. But if you set it too low, that is not good as well; then you might be giving the decision tree too little flexibility to capture the patterns and interactions in the training data. This will also cause the testing error to increase.
# Discretisation approaches - There are several approaches to transform continuous variables into discrete ones. This process is also known as BINNIG, with each bin being each interval. Discretization methods fall into 2 categories: supervised and unsupervised.
# maxBins - Number of bins used when discretizing continuous features. Increasing maxBins allows the algorithm to consider more split candidates and make fine-grained split decisions. However, it also increases computation and communication. Note that the maxBins parameter must be at least the maximum number of categories M for any categorical feature.

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10]) 
             .addGrid(dt.maxBins, [20, 40, 80]) 
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

In [None]:
pipeline = Pipeline(stages=[featureIndexer, dt, label_converter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(training_data)

In [None]:
predictions = cvModel.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 

In [None]:
# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
# Create initial Random Forest Classifier
# Random Forest is a popular machine learning algorithm that belongs to the supervised learning technique. It can be used for both Classification and Regression problems in ML. Random Forest is a classifier that contains a number of decision trees on various subsets of the given dataset and takes the average to improve the predictive accuracy of that dataset. Instead of relying on one decision tree, the random forest takes the prediction from each tree and based on the majority votes of predictions, and it predicts the final output. For more info - https://www.javatpoint.com/machine-learning-random-forest-algorithm
# Explanation of why Random forest works - 
#  1) The low correlation between models is the key. Uncorrelated models can produce ensemble predictions that are more accurate than any of the individual predictions. The reason for this wonderful effect is that the trees protect each other from their individual errors. While some trees may be wrong, many other trees will be right, so as a group the trees are able to move in the correct direction. For more explantion for low correlation (very good and imp) refer to [An Example of Why Uncorrelated Outcomes are So Great] - https://towardsdatascience.com/understanding-random-forest-58381e0602d2
#  2)Another key is - There needs to be some actual signal in our features so that models built using those features do better than random guessing.

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")

# Train model with Training Data.
rfModel = rf.fit(training_data)

In [None]:
predictions = rfModel.transform(test_data)

# Evaluate the model by computing the metrics. 
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd # Use of RDD principles to compute some other metrics - RDD - Resilient Distributed Datasets

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall)  

In [None]:
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
# numTrees - Total number of decision trees - The usual way is to perform a k-fold cross-validation for different number of trees (and any other combination of model's parameters) and choose the one with the best performance. 
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20]) 
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

In [None]:
pipeline = Pipeline(stages=[featureIndexer, rf, label_converter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(training_data)

In [None]:
predictions = cvModel.transform(test_data)
# Evaluate the best model
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall)  

In [None]:
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

In [None]:
# The data provided here are related with direct marketing campaigns (phone calls) of a Portuguese banking institution. 
# Three main algorithm classifiers are tested which are Logistic regression, Decision trees and Random forest. 
# Different metrics are computed after hyperparameter tunings using 5-fold cross validation to evaluate the models corresponding to these algorithms.