# Classfication in PySpark MLib- Experimentation

### Classification
Classfication is a supervised Machine learning task where we want to automatically categorize the data into some pre-defined categorization method.
Examples of classification might include sorting objects like flowers into various species or automatically labeling images into groups like cats, dogs fish, etc.
To be able to do this though, we need to have traning data and a pre-defined dependent variable which is the column in your dataset that defines the categoeries you want to predict. 

Algorithms Available

Pyspark offers the following algrothims for classification
1. Logistic regression
2. Naive Bayes
3. One vs Rest
4. Linear Support Vector Machine (SVC)
5. Random forest classifier
6. Gradient Boosted Tree (GBT) Classifier
7.  Decision Tree Classifier
8. Multilayer Perceptron Classifier (Neural Network)

In [1]:
# Create a PySpark instance
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Classification").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "cores")
spark

You are working with 1 cores


In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

### Data Set Name: Autistic Spectrum Disorder Screening Data for Adult
Autistic Spectrum Disorder (ASD) is a neurodevelopment condition associated with significant healthcare costs, and early diagnosis can significantly reduce these. Unfortunately, waiting times for an ASD diagnosis are lengthy and procedures are not cost effective. The economic impact of autism and the increase in the number of ASD cases across the world reveals an urgent need for the development of easily implemented and effective screening methods. Therefore, a time-efficient and accessible ASD screening is imminent to help health professionals and inform individuals whether they should pursue formal clinical diagnosis. The rapid growth in the number of ASD cases worldwide necessitates datasets related to behaviour traits. However, such datasets are rare making it difficult to perform thorough analyses to improve the efficiency, sensitivity, specificity and predictive accuracy of the ASD screening process. Presently, very limited autism datasets associated with clinical or screening are available and most of them are genetic in nature. Hence, we propose a new dataset related to autism screening of adults that contained 20 features to be utilised for further analysis especially in determining influential autistic traits and improving the classification of ASD cases. In this dataset, we record ten behavioural features (AQ-10-Adult) plus ten individuals characteristics that have proved to be effective in detecting the ASD cases from controls in behaviour science.

### Source: 
https://www.kaggle.com/faizunnabi/autism-screening

In [3]:
path = '/user/harishmohan/Datasets/'
As_df = spark.read.csv(path+'Autismscreening.csv',inferSchema=True,header=True)

In [4]:
As_df.limit(6).toPandas()

Unnamed: 0,A1_Score,A2_Score,A3_Score,A4_Score,A5_Score,A6_Score,A7_Score,A8_Score,A9_Score,A10_Score,...,gender,ethnicity,jundice,austim,contry_of_res,used_app_before,result,age_desc,relation,Class/ASD
0,1,1,1,1,0,0,1,1,0,0,...,f,White-European,no,no,'United States',no,6,'18 and more',Self,NO
1,1,1,0,1,0,0,0,1,0,1,...,m,Latino,no,yes,Brazil,no,5,'18 and more',Self,NO
2,1,1,0,1,1,0,1,1,1,1,...,m,Latino,yes,yes,Spain,no,8,'18 and more',Parent,YES
3,1,1,0,1,0,0,1,1,0,1,...,f,White-European,no,yes,'United States',no,6,'18 and more',Self,NO
4,1,0,0,0,0,0,0,1,0,0,...,f,?,no,no,Egypt,no,2,'18 and more',?,NO
5,1,1,1,1,1,0,1,1,1,1,...,m,Others,yes,no,'United States',no,9,'18 and more',Self,YES


In [5]:
print(As_df.printSchema())

root
 |-- A1_Score: integer (nullable = true)
 |-- A2_Score: integer (nullable = true)
 |-- A3_Score: integer (nullable = true)
 |-- A4_Score: integer (nullable = true)
 |-- A5_Score: integer (nullable = true)
 |-- A6_Score: integer (nullable = true)
 |-- A7_Score: integer (nullable = true)
 |-- A8_Score: integer (nullable = true)
 |-- A9_Score: integer (nullable = true)
 |-- A10_Score: integer (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ethnicity: string (nullable = true)
 |-- jundice: string (nullable = true)
 |-- austim: string (nullable = true)
 |-- contry_of_res: string (nullable = true)
 |-- used_app_before: string (nullable = true)
 |-- result: integer (nullable = true)
 |-- age_desc: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- Class/ASD: string (nullable = true)

None


In [6]:
As_df.groupBy("Class/ASD").count().show()

+---------+-----+
|Class/ASD|count|
+---------+-----+
|      YES|  189|
|       NO|  515|
+---------+-----+



In [7]:
from pyspark.sql.functions import col, explode, array, lit
import math

major_df = As_df.filter(col("Class/ASD") == 'NO')
minor_df = As_df.filter(col("Class/ASD") == 'YES')
ratio = math.ceil(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

ratio: 3


***Oversampling***: The idea of oversampling, is to duplicate the samples from under represented class, to inflate the number till it reaches the same level as the dominant class.

In [8]:
a = range(ratio)

# duplicate the majority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

# Combine both oversampled minority rows and previous majority rows
combined_df = major_df.unionAll(oversampled_df)
print(combined_df.groupBy('Class/ASD').count().show())
print(combined_df.count())

+---------+-----+
|Class/ASD|count|
+---------+-----+
|      YES|  567|
|       NO|  515|
+---------+-----+

None
1082


In [9]:
combined_df.limit(4).toPandas()

Unnamed: 0,A1_Score,A2_Score,A3_Score,A4_Score,A5_Score,A6_Score,A7_Score,A8_Score,A9_Score,A10_Score,...,gender,ethnicity,jundice,austim,contry_of_res,used_app_before,result,age_desc,relation,Class/ASD
0,1,1,1,1,0,0,1,1,0,0,...,f,White-European,no,no,'United States',no,6,'18 and more',Self,NO
1,1,1,0,1,0,0,0,1,0,1,...,m,Latino,no,yes,Brazil,no,5,'18 and more',Self,NO
2,1,1,0,1,0,0,1,1,0,1,...,f,White-European,no,yes,'United States',no,6,'18 and more',Self,NO
3,1,0,0,0,0,0,0,1,0,0,...,f,?,no,no,Egypt,no,2,'18 and more',?,NO


In [10]:
# Format Data
input_columns = combined_df.columns
input_columns = input_columns[1:-1]

dependent_var = 'Class/ASD'

In [11]:
# change label (class variable) to string type to prep for reindexing
# Pyspark is expecting a zero indexed integer for the label column. 
# Just in case our data is not in that format... we will treat it by using the StringIndexer built in method
renamed = combined_df.withColumn("label_str", combined_df[dependent_var].cast(StringType())) #Rename and change to string type
indexer = StringIndexer(inputCol="label_str", outputCol="label") #Pyspark is expecting the this naming convention 
indexed = indexer.fit(renamed).transform(renamed)

In [12]:
indexed.limit(5).toPandas()

Unnamed: 0,A1_Score,A2_Score,A3_Score,A4_Score,A5_Score,A6_Score,A7_Score,A8_Score,A9_Score,A10_Score,...,jundice,austim,contry_of_res,used_app_before,result,age_desc,relation,Class/ASD,label_str,label
0,1,1,1,1,0,0,1,1,0,0,...,no,no,'United States',no,6,'18 and more',Self,NO,NO,1.0
1,1,1,0,1,0,0,0,1,0,1,...,no,yes,Brazil,no,5,'18 and more',Self,NO,NO,1.0
2,1,1,0,1,0,0,1,1,0,1,...,no,yes,'United States',no,6,'18 and more',Self,NO,NO,1.0
3,1,0,0,0,0,0,0,1,0,0,...,no,no,Egypt,no,2,'18 and more',?,NO,NO,1.0
4,0,1,0,0,0,0,0,1,0,0,...,no,no,'United States',no,2,'18 and more',Self,NO,NO,1.0


In [13]:
# Convert all string type data into the input column list to numberic
numeric_inputs = []
string_inputs = []
for column in input_columns:
    if str(indexed.schema[column].dataType) == 'StringType':
        indexer = StringIndexer(inputCol=column, outputCol = column+ '_num')
        indexed = indexer.fit(indexed).transform(indexed)
        new_col_name = column+"_num"
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)


In [14]:
indexed.printSchema()

root
 |-- A1_Score: integer (nullable = true)
 |-- A2_Score: integer (nullable = true)
 |-- A3_Score: integer (nullable = true)
 |-- A4_Score: integer (nullable = true)
 |-- A5_Score: integer (nullable = true)
 |-- A6_Score: integer (nullable = true)
 |-- A7_Score: integer (nullable = true)
 |-- A8_Score: integer (nullable = true)
 |-- A9_Score: integer (nullable = true)
 |-- A10_Score: integer (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ethnicity: string (nullable = true)
 |-- jundice: string (nullable = true)
 |-- austim: string (nullable = true)
 |-- contry_of_res: string (nullable = true)
 |-- used_app_before: string (nullable = true)
 |-- result: integer (nullable = true)
 |-- age_desc: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- Class/ASD: string (nullable = true)
 |-- label_str: string (nullable = true)
 |-- label: double (nullable = false)
 |-- age_num: double (nullable = false)
 |-- gender_num: doubl

### Treating for Skewness and outliers
Skewness meausres how much a distribution of values deviates from symmetry aroud the mean. A value of zero mean the distribution is symmetric, while a postive skweness indicated a greater number of small values, and a negative value indicates a greater number of lager value. \
As a general rule of thumb: 
    - If skewness is less than -1 or greater than 1, the distribution is highly skewed
    - If skewness is between -1 and -0.5 or between 0.5 and 1, the distribution is moderately skewed
    - If skewness is between -0.5 and 0.5 the distribution is approximately symmetric
    
A common recommendation for treating skweness is either a log transformation for postive skewness data or an exponential transformation for negative skewed data
Outliers: One common way to correct outliers is by flooring and capping which means editing ant value that is above or below a certain threshold (99th percentile or 1st percentile) back to the highest/lowest value in that percentile. For example, if the 99th percentile is 96 and there is a value of 1,000, you would change that value to 96.

In [15]:
# Treat for Skewness
# Flooring and capping
# if right skew take the log+1
# if left skew do exp transformation

# Create empty dictionary d
d = {}
# Create a dictionary of quantiles from your numberic cols
# I'm doing the top and bottom 1% but you can adjust if needed

for col in numeric_inputs:
    d[col] = indexed.approxQuantile(col, [0.01, 0.99], 0.001)


In [16]:
for col in numeric_inputs: 
    skew = indexed.agg(skewness(indexed[col])).collect()
    skew = skew[0][0]
    # If skewness is found
    # This function will make the appropriate corrections
    
    if skew > 1: #If right shew, floor, cap and log(x+1)
        indexed = indexed.withColumn(col, \
        log(when(combined_df[col] < d[col][0], d[col][0]) \
        .when(indexed[col] > d[col][1], d[col][1]) \
        .otherwise(indexed[col])+ 1).alias(col))
        print(col+ " has been treated for postive (right) skewness. (skew=)", skew, ")")
        
    elif skew < -1: # If left skew floor, cap and exp(x)
        indexed = indexed.withColumn(col, \
        exp(when(combined_df[col] < d[col][0], d[col][0]) \
        .when(indexed[col] > d[col][1], d[col][1]) \
        .otherwise(indexed[col])).alias(col))
        print(col+ "has been treated for negative (left) skewness. (skew = ",skew,")")

In [17]:
print(indexed.count())
indexed.limit(5).toPandas()

1082


Unnamed: 0,A1_Score,A2_Score,A3_Score,A4_Score,A5_Score,A6_Score,A7_Score,A8_Score,A9_Score,A10_Score,...,label,age_num,gender_num,ethnicity_num,jundice_num,austim_num,contry_of_res_num,used_app_before_num,age_desc_num,relation_num
0,1,1,1,1,0,0,1,1,0,0,...,1.0,12.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,1,1,0,1,0,0,0,1,0,1,...,1.0,6.0,1.0,7.0,0.0,1.0,9.0,0.0,0.0,0.0
2,1,1,0,1,0,0,1,1,0,1,...,1.0,13.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
3,1,0,0,0,0,0,0,1,0,0,...,1.0,19.0,0.0,2.0,0.0,0.0,36.0,0.0,0.0,1.0
4,0,1,0,0,0,0,0,1,0,0,...,1.0,16.0,0.0,4.0,0.0,0.0,0.0,0.0,0.0,0.0


In [18]:
# Now check for negative values in the dataframe. 
# Produce a warning if there are negative values in the dataframe that Naive Bayes cannot be used. 
# Note: we only need to check the numeric input values since anything that is indexed won't have negative values

In [19]:
minimums = combined_df.select([min(c).alias(c) for c in combined_df.columns if c in numeric_inputs])
min_array = minimums.select(array(numeric_inputs).alias("mins"))
min_array.show(1, False)

+------------------------------+
|mins                          |
+------------------------------+
|[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]|
+------------------------------+



In [20]:
maximum = combined_df.select([max(c).alias(c) for c in combined_df.columns if c in numeric_inputs])
max_array = maximum.select(array(numeric_inputs).alias("max"))
max_array.show(1, False)

+-------------------------------+
|max                            |
+-------------------------------+
|[1, 1, 1, 1, 1, 1, 1, 1, 1, 10]|
+-------------------------------+



In [21]:
# We need to Vectorize for df
# Becasue the function that we use to make that correction requires a vector
# create final feature list

features_list = numeric_inputs + string_inputs
# Create your vector assembler object
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
# And call on the vector assembler to transform your dataframe
output = assembler.transform(indexed).select('features','label')
output.show(5, False)

+-------------------------------------------------------------------------+-----+
|features                                                                 |label|
+-------------------------------------------------------------------------+-----+
|(19,[0,1,2,5,6,9,10],[1.0,1.0,1.0,1.0,1.0,6.0,12.0])                     |1.0  |
|(19,[0,2,6,8,9,10,11,12,14,15],[1.0,1.0,1.0,1.0,5.0,6.0,1.0,7.0,1.0,9.0])|1.0  |
|(19,[0,2,5,6,8,9,10,14],[1.0,1.0,1.0,1.0,1.0,6.0,13.0,1.0])              |1.0  |
|(19,[6,9,10,12,15,18],[1.0,2.0,19.0,2.0,36.0,1.0])                       |1.0  |
|(19,[0,6,9,10,12],[1.0,1.0,2.0,16.0,4.0])                                |1.0  |
+-------------------------------------------------------------------------+-----+
only showing top 5 rows



In [22]:
# Create the mix max scaler object 
# This is what will correct for negative values
# I like to use a high range like 1,000 
#     because I only see one decimal place in the final_data.show() call
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(output)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(output)
final_data = scaled_data.select('label','scaledFeatures')
# Rename to default value
final_data = final_data.withColumnRenamed("scaledFeatures","features")
final_data.show(5, False)

Features scaled to range: [0.000000, 1000.000000]
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                       |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------+
|1.0  |[1000.0,1000.0,1000.0,0.0,0.0,1000.0,1000.0,0.0,0.0,600.0,260.8695652173913,0.0,0.0,0.0,0.0,0.0,0.0,500.0,0.0]                                 |
|1.0  |[1000.0,0.0,1000.0,0.0,0.0,0.0,1000.0,0.0,1000.0,500.0,130.43478260869566,1000.0,636.3636363636364,0.0,1000.0,136.36363636363635,0.0,500.0,0.0]|
|1.0  |[1000.0,0.0,1000.0,0.0,0.0,1000.0,1000.0,0.0,1000.0,600.0,282.6086956521739,0.0,0.0,0.0,1000.0,0.0,0.0,500.0,0.0]                              |
|1.0  |[0.0,0.0,0.0,0.0,0.0,0.0,1000.0

In [23]:
train, test = final_data.randomSplit([0.65, 0.35])

In [24]:
train.show(5, False)
test.show(5, False)

+-----+-----------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                     |
+-----+-----------------------------------------------------------------------------------------------------------------------------+
|1.0  |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1000.0,272.7272727272727,0.0,0.0,60.60606060606061,0.0,500.0,0.0]               |
|1.0  |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,108.69565217391305,0.0,181.8181818181818,0.0,0.0,212.12121212121212,0.0,500.0,200.0]|
|1.0  |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,108.69565217391305,0.0,363.6363636363636,0.0,0.0,60.60606060606061,0.0,500.0,0.0]   |
|1.0  |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,173.91304347826087,1000.0,0.0,0.0,0.0,15.151515151515152,0.0,500.0,0.0]             |
|1.0  |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,217.3913043478

In [25]:
from pyspark.ml.classification import *
from pyspark.ml.evaluation import * 
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [26]:
Bin_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction')
MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

### Simple Logistic Regression -without cross validation 

In [27]:
# This is the simplistic approach which does not use cross validation
classifier = LogisticRegression()
fitModel = classifier.fit(train)

# Evaluation method for binary classification problem
predictionAndLabels = fitModel.transform(test)
auc = Bin_evaluator.evaluate(predictionAndLabels)
print("AUC: ", auc)

# Evaluation for a multiclass classification problem
predictions = fitModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictionAndLabels)*100)
print("Accuracy: {0:.2f}".format(accuracy),"%") #     print("Test Error = %g " % (1.0 - accuracy))
print(" ")

AUC:  0.9973262032085561
Accuracy: 99.73 %
 


### Logistic Regression- with cross validation
Spark has a build in validation functions to conduct cross validation which begins by splitting the training dataset into a set of "flolds" which are used as separate traning and test datasets. For example, with k=5 folds, CrossValidation will generate 5 different (training and testing) datasets pairs, each of which uses 4/5 of the data for traning and 1/5 for testing. To evaluate a particular Paramete, CrossValidation, computes the average evaluation metric for the 5 models produced by fitting the estimator on the 5 different (traning, tesing) dataset pairs and tells you which model performed the best once it funished. 

After identifying the best ParamMap, CrossValidation finally re-fits the Estimator using the best ParamMap and the entire dataset. 

***MaxIter***: The maximum number of iterations to use. There is no clear formula for setting the optimum iteration number, but you can figure out this issue by an iterative process by initializing the iteration number by a small number like 100 and then increase its lineary. This process will be repeated until the MSE of the test does not decrease and even may increase.

In [28]:
classifier = LogisticRegression()

# Set up your parameter grid for the cross validator to conduct hyperparameter turning
paramGrid = (ParamGridBuilder().addGrid(classifier.maxIter, [10, 15, 20]).build())

# Set up the Cross Validation which requires all the following parameters: 
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MC_evaluator,
                          numFolds=3)
# Then fit the model
fitModel = crossval.fit(train)

# Collect the best model 
# print the coefficient matrix
# These values should be compared relative to eachother
# And intercepts can be prepared to other models
BestModel = fitModel.bestModel
print("Intercept: " + str(BestModel.interceptVector))
print("Coefficients: \n" + str(BestModel.coefficientMatrix))

# You can extract the best model from this run like this if you want
LR_BestModel = BestModel

# Next you need to generate predictions on the test dataset
# fitModel automatically uses the best model
# so we don't need to use BestModel here 
predictions = BestModel.transform(test)

# Now print the accuracy rate of the model or AUC for a binary Classifier
accuracy = (MC_evaluator.evaluate(predictions))*100
print(accuracy)

Intercept: [77.06701654156629]
Coefficients: 
DenseMatrix([[-9.63869022e-03, -7.84159335e-03, -9.66705685e-03,
              -1.15161002e-02, -8.02302876e-03, -1.08283439e-02,
              -9.78533030e-03, -1.18500117e-02, -7.35044193e-03,
              -3.76266431e-02,  3.42061962e-03,  3.88274800e-04,
               2.51851148e-03,  4.73925210e-05, -2.76285482e-04,
               5.75394673e-03, -1.50626912e-02,  0.00000000e+00,
               4.42420868e-03]])
99.73190348525469


In [29]:
# Create feature importance scores
coeff_array = BestModel.coefficientMatrix.toArray()
coeff_scores = []
for x in coeff_array[0]:
    coeff_scores.append(float(x))
    
result = spark.createDataFrame(zip(input_columns, coeff_scores), schema=['feature', 'coeff'])
result.show(100)

+---------------+--------------------+
|        feature|               coeff|
+---------------+--------------------+
|       A2_Score|-0.00963869021536...|
|       A3_Score|-0.00784159335336...|
|       A4_Score|-0.00966705684667...|
|       A5_Score|-0.01151610023448...|
|       A6_Score|-0.00802302875616...|
|       A7_Score|-0.01082834392296...|
|       A8_Score|-0.00978533029837...|
|       A9_Score|-0.01185001168206...|
|      A10_Score|-0.00735044193473033|
|            age|-0.03762664308840...|
|         gender|0.003420619615851887|
|      ethnicity|3.882747997514347...|
|        jundice|0.002518511480284284|
|         austim|4.739252099828245E-5|
|  contry_of_res|-2.76285482482491...|
|used_app_before|0.005753946727488514|
|         result|-0.01506269120060...|
|       age_desc|                 0.0|
|       relation|0.004424208676235...|
+---------------+--------------------+



In [30]:
combined_df.printSchema()

root
 |-- A1_Score: integer (nullable = true)
 |-- A2_Score: integer (nullable = true)
 |-- A3_Score: integer (nullable = true)
 |-- A4_Score: integer (nullable = true)
 |-- A5_Score: integer (nullable = true)
 |-- A6_Score: integer (nullable = true)
 |-- A7_Score: integer (nullable = true)
 |-- A8_Score: integer (nullable = true)
 |-- A9_Score: integer (nullable = true)
 |-- A10_Score: integer (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ethnicity: string (nullable = true)
 |-- jundice: string (nullable = true)
 |-- austim: string (nullable = true)
 |-- contry_of_res: string (nullable = true)
 |-- used_app_before: string (nullable = true)
 |-- result: integer (nullable = true)
 |-- age_desc: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- Class/ASD: string (nullable = true)



### One vs. Rest
The One-vs-Rest classifier is a type of multiclass classifier that involves training a single classifier per class, with the samples of the class as postive samples as negatives. So each is viewed as it compares to rest of the classes as a whole, as opposed to each one individaully. 

***regPram***: The purpose of the regularizer is to encourage simple models and avoid overfitting

In [31]:
# Instantiate the base Classifier
lr = LogisticRegression()

# Instantiate the One vs Rest Classifier
classifier = OneVsRest(classifier=lr)

# Add parameters of your choice here: 
paramGrid = ParamGridBuilder() \
            .addGrid(lr.regParam, [0.1, 0.01]) \
            .build()

# Cross Validator requires the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2)

# Cross Valiator requires the following prameters:
fitModel = crossval.fit(train)

# Print the Coefficients
# First we need to extract the best model from fit model

# Get Best Model
BestModel = fitModel.bestModel

# Extract list of binary models
models = BestModel.models

for model in models:
    print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoefficients:'+ '\033[0m',model.coefficients)

# Now generate predictions on test dataset
predictions = BestModel.transform(test)

# And Calculate the accuracy score
accuracy = (MC_evaluator.evaluate(predictions))*100
accuracy


[1mIntercept: [0m -11.983327410988968 [1m
Coefficients:[0m [0.0014147237297437861,0.0011603159820726785,0.0014671375999953445,0.001993364161714782,0.0012841984039581094,0.0014588832285791563,0.0015841600567746382,0.0019684388933657935,0.001186584771856971,0.0062777716536192825,-0.00014706833064013616,-0.0001875245042105071,0.0003382119275422131,0.0004714857197136602,-1.6052576701380337e-05,-0.00042408314353121785,0.0016945819912884679,0.0,-0.0011546825535908238]
[1mIntercept: [0m 11.983327410988933 [1m
Coefficients:[0m [-0.0014147237297437861,-0.0011603159820726765,-0.0014671375999953408,-0.00199336416171478,-0.001284198403958107,-0.0014588832285791554,-0.0015841600567746395,-0.0019684388933657922,-0.0011865847718569669,-0.006277771653619247,0.00014706833064013865,0.00018752450421050847,-0.0003382119275422104,-0.0004714857197136609,1.6052576701379984e-05,0.00042408314353122094,-0.0016945819912884646,0.0,0.0011546825535908235]


99.19571045576407

### Multilayer Perceptron Classifier
#### Neural Network
A multilayer perceptron (MLP) is a class of feedforward artifical neural network. It consist of at least three layers of nodes; an input layer, a hidden layer and an output layer. Expect for the input nodes, each nodes is a neuron that uses a nonlinear activation function. MLP utilizies a supervised learning technique called backpropagation for traning. Its multiple layers and non-linear activation distinguish MLP from a linear perceptron. It can distinguish data that is not lineraly separable.

##### Common Hyper Parameters

***MaxIter***: The maximum number of iterations to use. There is no clear formula for setting the optimum iteration number, but you can figure out this issue by an iterative process by initalizing the iteration number by a small number like 100 and then increase it linearly. This process will be replaced unitl the MSE of the test does not decrease and even may increase.

***Layers***: Spark requires that the input layer equals the number of features in the dataset, the hidden layer might be one or two more that the (flexible), and the output layer has to be equal to the number of classes.

***Block Size***: Block size for stacking input data in matrices to speed up the compuatation. Data is stacked within partitions. If block size is more than remaining data in a partition then it is abjusted to the size of this data. Recommended size is between 10 and 1000. Default: 128

***Seed***: A random seed. Set the value if you need your results to be reproducible across repeated calls (highly recommended). 

***Weights***: Each hidden neuron added will increase the number of weights, thus it is recommended to use the least number of hidden neurons that accomplish the task. Using more hidden neurons that required will add more complexity.


In [32]:
# Count Features
features = final_data.select(['features']).collect()
features_count = len(features[0][0])

# Count Classes
class_count = final_data.select(countDistinct("label")).collect()
classes = class_count[0][0]

# Then use this number to specify the layers
# The first number in this list is the input layer which has to be equal to the number of features in your vector
# The second number is the first hidden layer
# The third number is the second hidden layer
# The fourth number is the output layer which has to be equal to your class size
layers = [features_count, features_count+1, features_count, classes]

# Instaniate the classifier
classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# Fit the model
fitModel = classifier.fit(train)

# Print the model weights
print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)

# Generate predictions on test dataframe
predictions = fitModel.transform(test)

# Calculate accuracy score
accuracy = (MC_evaluator.evaluate(predictions))*100

# Print accuracy score
print("Accuracy: ", accuracy)

[1mModel Weights: [0m 839
Accuracy:  88.73994638069705


### Navie Bayes
The Navie Bayes Classifier is a collection of classification algorithms based on Bayes Theorem. It is not a single algrothim but a family of algrothims that all share a common principle, that every feature being classified is independent of the value of any other features.

***Assumptions:***
- Independence between every pair of features
- Feature values are non-negative (which is why we checked earlier)

***Hyper Parameters***:

***Smoothing***: It is problematic when a frequency-based probaility is zero, because it will wipe out all the inforamtion in the other probailities, and we need to find a solution for this. A solution would be laplace smoothing, which is a techniques for smoothing categorical data. In PySpark, this number need to be >= 0, default is 1.0

***Thresholds***:
Thresholds is multi-class classification to adjust the probaility of predicting each class. Array must be length equal to the number of classes, with value >0, expecting that at most one value may be 0. The class with largest value p/t predicted, where p is the orignial probability of that class and t is the class's threshold. The default value os none.

***weightCol***: 
If you have a weight column you would enter the name of the column here. If this is not set or empty, we treat all instance weight as 1.0.


In [33]:
# Add parameters of your choice here: 
classifier = NaiveBayes()
paramGrid = (ParamGridBuilder() \
            .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
            .build())

# Cross Validation requires all of the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2)

fitModel = crossval.fit(train)

predictions = fitModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictions))*100
print("Accuracy: ", accuracy)

Accuracy:  85.25469168900804


### Linear Support Vector Machine
Linear SVMs are based on the idea of finding a hyperplane that best divides a dataset into two classes, which is why you can only use it for binary classification. Support vectors are the data points neareast to the hyperplane, the points of a data set that, if removed, would alter the postition of the dividing hyperplane. Because of this, they can be considered the critical elements of a data set. Intuitively, th e further for the hyperplane our data points lie, the more confident we are that they have been correctly classified. We therefore want our data points to be as far away from the hyperplane as possbile, while still being on the correct side of it. So when new testing data is added, whatever side of the hyperplane it lands will decide the class that we assign to it.

***Interpretting the Coefficients***: 
Each coefficients directions gives us the predicted class, so if you take the dot product of any point with the vector, you can tell on which side it is; if the dot product is postive, it belongs to the positive class, if it is negative it belongs to the neagative class. 

You can even learn something about the importance of each features. Let's say the svm would find only one feature useful for sparating data, then the hyperplane would be orthogonal to that axis. So, you could say that hte absolute size of the coefficient realtive to the other ones gives an indication of how important the feature was for the separation. 

***Hyper Parameters:***

***MaxIter***: The maximum number of iterations to use. There is no clear formula for setting the opitmum iteration number, but you can figure out this issue by an iterative process by intializing the iteration number by a small number like 100 and then increase it linearly. This process will be repeated until the MSE of the test does no decrease and even may increas

***regParam***: The purpose of the reqularizer is to encourage simple models and avoid overfitting. To learn more about this concept 



In [34]:
# Count how many classes you have and produce an error if it's more than 2
class_count = final_data.select(countDistinct("label")).collect()
classes = class_count[0][0]

if classes > 2:
    print("LinearSVC cannot be used because PySpark currently only accepts binary classification data for this algorithm")

In [35]:
# Add Parameters of your choice here:
classifer = LinearSVC()
paramGrid = (ParamGridBuilder() \
             .build())

# Cross Validation requires all of the following parameters: 
crossval = CrossValidator(estimator=classifer,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(),
                           numFolds=3)

fitModel = crossval.fit(train)
BestModel = fitModel.bestModel

print("Intercept: \n" + str(BestModel.intercept))
print('\033[1m' + " Coefficients"+ '\033[0m')
print("You should compares these relative to eachother")
print("Coefficients: \n" + str(BestModel.coefficients))

# Automatically gets the best model
prdictions = BestModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictions))*100
print("Accuracy:", accuracy)

Intercept: 
10.50772938195986
[1m Coefficients[0m
You should compares these relative to eachother
Coefficients: 
[-0.0014685941084461728,-0.0013007750487953693,-0.0015816498767680749,-0.0018233485768019743,-0.001322112807286529,-0.0013512235654348371,-0.0016221482480992463,-0.0018840637506359695,-0.0013300913893052723,-0.0033447293685842073,-0.00019721272412356674,0.00031061019558149594,-0.0005508875210642046,-0.000442275870867221,1.9417473241708744e-05,0.00010872281223349515,-0.00047003491317712195,0.0,0.0012189862022170446]
Accuracy: 85.25469168900804


### Decision Tree
Decision Trees classifier are a suprevised learning method is used to classify a variable by learning from historical data that the model uses to approximate a sine curve with a set of if-then-else decision rules. The deeper the tree, the more complex the decision rules and the filtter the model. 

Decision tree builds classification or regression models in the form of a tree structure. It breaks down a data set into smaller and smaller subset while at the same time an associated decision tree which corresponds to the best predictor called root node. Decision trees can handle both categotical and numberical data. 

#### Common Hyper Parameters
- ***maxBins*** = Max number of bins for discretizing continuous feature. Must be >=2 and <=  number of categories for any categorical feture.
    - ***Continuous features***: For small datasets in single-machine implementaions, the split candidates for each continuous feature are typically the uunique values for the feature. Some implementations sort the feature values and then use the ordered unique value as split candidates for faster tree calculations. Sorting feature values is expensive for large distributed datasets. This implementation computers an approximate set of split candidates by performance a quantilte calculation over a sampled fraction of the data. The ordered spits create "bins" and the maximum number of such bins can be specified using the MaxBins parameters. Note that the number of bins cannot be greater than the number of instances N (a rare scenario since the default maxBins value in 32). The tree algrothims automatically reduces the number of bins if the condition is not satisfied
    
    - ***Categorical features***: For a categorical feature with M possible values (categories), one could come up with 2 exp(M-1) split candidates. For binary(0/1) classification and regression, we can reduce the number of split candidates to M-1 by ordering the categorical feature values by the average label. For example, for a binary classification problem with one categorial features with three categories A,B and C whose corresponding proportions of label 1 are 0.2, 0.6, and 0.4, the categorical features are ordered as A, C, B. The two split candidates are A|C, B and A,C|B where denotes the split. In multiclass classification, all 2 exp(M-1)-1 possible splits are used whenever possbile. When 2 exp(M-1)-1 is geater than the maxBins parameters, we use a (heuristic) method similar to the method used for binary classifiaction and regression. The M categorical feature values are ordered by impurity, and the resulting M-1 split candidates are considered.
    
- ***MaxDepth*** = The max_depth parameter specifies the maximium depth of each tree. The deafult value of max_depth is None, which means that each tree will expand until evey leaf is pure. A pure leaf is one where all of the data on the leaf comes from the same class.

#### Feature Importance Scores
Scores add up to 1 across all variables so the lowest socre is the least importance varaible. 

In [36]:
# Add parameters of your choice here:
classifier = DecisionTreeClassifier()
paramGrid = (ParamGridBuilder() \
            .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
            .build())

# Cross Validator requires all of the folllowing parameters: 
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid, 
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds = 2)

# Fit model: Run cross-validation, and choose the best set of parameters
fitModel = crossval.fit(train)

# Collect and print feature importances

BestModel = fitModel.bestModel
featureImportances = BestModel.featureImportances.toArray()
print("Feature Importances: ", featureImportances)

predictios = fitModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictions))*100
print("Accuracy; ", accuracy)


Feature Importances:  [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Accuracy;  85.25469168900804


In [37]:
print(BestModel.toDebugString)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_487f876902476a872a89) of depth 1 with 3 nodes
  If (feature 9 <= 650.0)
   Predict: 1.0
  Else (feature 9 > 650.0)
   Predict: 0.0



In [38]:
# Zip input_columns qith feature importance scores and create df
# First convert feaureimportance scores from numpy array to list
imp_scores = []
for x in featureImportances:
    imp_scores.append(int(x))
    
# Then zip with input_columns list and create a df
result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
print(result.orderBy(result["score"].desc()).show(truncate=False))

+---------------+-----+
|feature        |score|
+---------------+-----+
|age            |1    |
|contry_of_res  |0    |
|used_app_before|0    |
|A9_Score       |0    |
|result         |0    |
|A10_Score      |0    |
|A7_Score       |0    |
|A5_Score       |0    |
|ethnicity      |0    |
|jundice        |0    |
|austim         |0    |
|gender         |0    |
|A8_Score       |0    |
|age_desc       |0    |
|relation       |0    |
|A6_Score       |0    |
|A4_Score       |0    |
|A2_Score       |0    |
|A3_Score       |0    |
+---------------+-----+

None


#### Random Forest
Suppose you have a training set with 6 classes, random forest may create three decision trees taking input of each subset. Finally, it predicts based on the majority of votes from each of the decision trees made. This works well becasue a single decision tree may be prone to noise, but aggregate of many decision trees reduce the effect of noise giving more accurate results. The subsets in different decision trees created may overlap. 

##### Common Hyper Parameter
- **maxBins** = Max number of bins for discretizing continuous features. Must be >= 2 and >= number of categories for any catgorical features. 
    - ***Continuous features***: For small datasets in single-machine implementations, the split candidates for each continuous features are typically the unique values for the feature. Some implementations sort the feature values and then use the ordered unique values as split candidates for faster tree calculations. Sorting features values is expenisive for large distribuited datasets. This implementation computes an approximate set of split candidates by peroformaing a quantile calculation over a sampled fraction of the data. The ordered splits create "bins" and the maxumum number of such bins can be specified using the maxBins parmeters. Note that the number of bins cannot be greater than the number of instance N. The tree algrothim automatically reduces the number of bins if the condition is not satisfied.
    - ***Categorical features***: For a categorical feature with M possible values (categories), one could come up with 2 exp(M-1)-1 split candidates. For binary (0/1) classification and regression, we can reduce the number of split candidates to M-1 by ordering the categroical feature values by the average label. For exampe, for a features are ordered as, A, C, B. The two split candidates are A| C, B and A, C | B where | denodes the split.
        - In multiclass classification, all 2 exp(M-1)-1 possible splits are used whenever possible. When 2 exp(M-1)-1 is greater than the maxBins parameter, we use a method similar to the method used for binary classification and regression. The M categorical feature values are ordered by impurity, and the resulting M-1 split candidates are considered. 
- **maxDepth** = The maxDepth parameter specifies the maximum depth of each tree. The default value for max_depth is None, which means that each tree will expand until every leaf is pure. A pure leaf is one where all of the data on the leaf comes from the same class. 

#### Feature Importance Scores
Scores add up to 1 accross all variables so the lowest score is the least importance variable. 

In [39]:
# Add parameters of your choice here"
classifier = RandomForestClassifier()
paramGrid = (ParamGridBuilder() \
             .addGrid(classifier.maxDepth, [2, 5, 10])
                                 .addGrid(classifier.maxBins, [5, 10, 20])
                                 .addGrid(classifier.numTrees, [5, 20, 50])
                     .build())

# Cross Validator requires all of the following parameters: 
crossval = CrossValidator(estimator = classifier, 
                          estimatorParamMaps = paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds = 2)

# Fit Model: Run Cross-Validation, and choose the best set of parameters. 
fitModel = crossval.fit(train)

# Retrieve best model from cross val
BestModel = fitModel.bestModel
featureImportance = BestModel.featureImportances.toArray()
print("Feature Importance: ", featureImportances)

predictions = (MC_evaluator.evaluate(predictions))*100
print(" ")
print("Accuracy: ", accuracy)

Feature Importance:  [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 
Accuracy:  85.25469168900804


### Gradient Boost Tree Classifer
It's more of a hierarchical approch. It combines the weak learners to strong prediction rules that allow a flexble partition of the feature space. The objective here, as is of my supervised learning algorthim, is to define a loss function and minimize it. 

### Common Hyper Parameters

 - **maxBins** = Max number of bins for discretizing continuous features. Must be >=2 and >= number of categories for any categorical feature.
     - **Continuous features:** For small datasets in single-machine implementations, the split candidates for each continuous feature are typically the unique values for the feature. Some implementations sort the feature values and then use the ordered unique values as split candidates for faster tree calculations.
         Sorting feature values is expensive for large distributed datasets. This implementation computes an approximate set of split candidates by performing a quantile calculation over a sampled fraction of the data. The ordered splits create “bins” and the maximum number of such bins can be specified using the maxBins parameter.
         Note that the number of bins cannot be greater than the number of instances N (a rare scenario since the default maxBins value is 32). The tree algorithm automatically reduces the number of bins if the condition is not satisfied.

     - **Categorical features:** For a categorical feature with M possible values (categories), one could come up with 2 exp(M−1) −1 split candidates. For binary (0/1) classification and regression, we can reduce the number of split candidates to M−1 by ordering the categorical feature values by the average label. For example, for a binary classification problem with one categorical feature with three categories A, B and C whose corresponding proportions of label 1 are 0.2, 0.6 and 0.4, the categorical features are ordered as A, C, B. The two split candidates are A | C, B and A , C | B where | denotes the split.
         In multiclass classification, all 2 exp(M−1) −1 possible splits are used whenever possible. When 2 exp(M−1) −1 is greater than the maxBins parameter, we use a (heuristic) method similar to the method used for binary classification and regression. The M categorical feature values are ordered by impurity, and the resulting M−1 split candidates are considered.
         
 - **maxDepth** = The maxDepth parameter specifies the maximum depth of each tree. The default value for max_depth is None, which means that each tree will expand until every leaf is pure. A pure leaf is one where all of the data on the leaf comes from the same class.

### Feature Importance Scores
Scores add up to 1 accross all varaibles so the lowest score is the least imporant variable. 

In [41]:
class_count = final_data.select(countDistinct("label")).collect()
classes = class_count[0][0]
if classes > 2:
    print("GBTClassifier cannot be used because PySpark currently only accepts binary classification data for this algorithm")

paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
#             .addGrid(classifier.maxIter, [10, 15,50,100])
             .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2) # 3 + is best practice

# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

BestModel = fitModel.bestModel
featureImportances = BestModel.featureImportances.toArray()
print("Feature Importances: ",featureImportances)
    
predictions = fitModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictions))*100
print(" ")
print("Accuracy: ",accuracy)

Feature Importances:  [1.28640033e-02 1.90900505e-02 1.13649268e-02 1.45030832e-01
 8.93425545e-02 2.10258751e-03 1.34570001e-02 6.36346063e-02
 2.21144441e-02 6.04253296e-01 4.34643588e-03 9.66584345e-05
 6.49135824e-03 0.00000000e+00 2.41598675e-04 4.42450528e-03
 3.93715152e-04 0.00000000e+00 7.51427825e-04]
 
Accuracy:  100.0
