# Classification
A classification strategy is used when the tags are not **continuous**, but **categorical**. 

### Examples for binary classification
- Spam detector (yes/no)
- Default loan (yes/no)
- Diagnosis of a disease (yes/no)

Support Vector             |  Logisitic Regression
:-------------------------:|:-------------------------:
![Support Vector](images/svm.png "Support Vector")  |  ![Logistic Regression](images/logistic.png "Logistic Regression")




## Classification Algorithms in Spark
- Logistic Regression
- Support Vector Classifier
- Naive Bayes Classifier

In [1]:
from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName('Classification').getOrCreate()

df = spark.read.csv("titanic.csv", header=True, inferSchema=True)

df.printSchema()
df.show()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|   

In [2]:
df = df.na.drop()

## Transformation of categorical variables
String variables will need to be transformed in numerical tapes in order to be processed by Spark's ML library. 
We can use a String Indexer, or a One-Hot Encoder. 

In [3]:
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCol='Sex', outputCol='SexIndex')
si_fit = si.fit(df) # model pentru indexer

df_indexed = si_fit.transform(df)

### Caution!
One-Hot Encoding only works for numerical values. We will use the **'SexIndex'** column, which has already been transformed in numerical values

In [4]:
from pyspark.ml.feature import OneHotEncoder
ohe = OneHotEncoder(inputCol='SexIndex', outputCol='SexEncoded')
ohe_fit = ohe.fit(df_indexed)
df_encoded = ohe_fit.transform(df_indexed)

In [5]:
df_encoded.orderBy('Age').select(['Sex', 'SexIndex', 'SexEncoded']).show(5)

+------+--------+-------------+
|   Sex|SexIndex|   SexEncoded|
+------+--------+-------------+
|  male|     0.0|(1,[0],[1.0])|
|  male|     0.0|(1,[0],[1.0])|
|female|     1.0|    (1,[],[])|
|female|     1.0|    (1,[],[])|
|  male|     0.0|(1,[0],[1.0])|
+------+--------+-------------+
only showing top 5 rows



### Splitting the data before assembling
We will use compare the two encodings, therefore we will like to process the same data. 
The split data will be used to create two types of data: indexed and one-hot encoded. 

In [6]:
train_df, test_df = df_encoded.randomSplit([0.7, 0.3])

In [7]:
from pyspark.ml.feature import VectorAssembler
print(df_indexed.columns)
print(df_encoded.columns)

['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked', 'SexIndex']
['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked', 'SexIndex', 'SexEncoded']


In [8]:
assembler_index = VectorAssembler(inputCols=['SexIndex', 'Age', 'SibSp', 'Parch', 'Fare'], outputCol='features')
assembler_encode = VectorAssembler(inputCols=['SexEncoded', 'Age', 'SibSp', 'Parch', 'Fare'], outputCol='features')

In [9]:
i_df_train = assembler_index.transform(train_df)
e_df_train = assembler_encode.transform(train_df)
i_df_test = assembler_index.transform(test_df)
e_df_test = assembler_encode.transform(test_df)

In [10]:
i_df_train = i_df_train.select(['features', 'Survived'])
e_df_train = e_df_train.select(['features', 'Survived'])

i_df_test = i_df_test.select(['features', 'Survived'])
e_df_test = e_df_test.select(['features', 'Survived'])

In [11]:
i_df_train.orderBy('Age').show(5, truncate=False)
e_df_train.orderBy('Age').show(5, truncate=False)

+-------------------------+--------+
|features                 |Survived|
+-------------------------+--------+
|[0.0,0.92,1.0,2.0,151.55]|1       |
|[0.0,2.0,1.0,1.0,26.0]   |1       |
|[0.0,3.0,1.0,1.0,26.0]   |1       |
|[1.0,4.0,2.0,1.0,39.0]   |1       |
|[0.0,11.0,1.0,2.0,120.0] |1       |
+-------------------------+--------+
only showing top 5 rows

+-------------------------+--------+
|features                 |Survived|
+-------------------------+--------+
|[1.0,0.92,1.0,2.0,151.55]|1       |
|[1.0,2.0,1.0,1.0,26.0]   |1       |
|[1.0,3.0,1.0,1.0,26.0]   |1       |
|[0.0,4.0,2.0,1.0,39.0]   |1       |
|[1.0,11.0,1.0,2.0,120.0] |1       |
+-------------------------+--------+
only showing top 5 rows



## Logistic Regression 

In [12]:
from pyspark.ml.classification import LogisticRegression
log_reg = LogisticRegression(labelCol='Survived')
i_log_reg_model = log_reg.fit(i_df_train)
e_log_reg_model = log_reg.fit(e_df_train)

In [13]:
i_log_reg_model.summary.predictions.show(5)
e_log_reg_model.summary.predictions.show(5)

+--------------------+--------+--------------------+--------------------+----------+
|            features|Survived|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+----------+
|[1.0,38.0,1.0,0.0...|     1.0|[-3.0613390853823...|[0.04473044952251...|       1.0|
|[1.0,58.0,0.0,0.0...|     1.0|[-1.9596293631014...|[0.12350716457662...|       1.0|
|(5,[1,4],[28.0,35...|     1.0|[-0.0529499238256...|[0.48676561099766...|       1.0|
|[1.0,49.0,1.0,0.0...|     1.0|[-2.5693546778128...|[0.07113693289700...|       1.0|
|[0.0,65.0,0.0,1.0...|     0.0|[2.02643540437449...|[0.88354480578035...|       0.0|
+--------------------+--------+--------------------+--------------------+----------+
only showing top 5 rows

+--------------------+--------+--------------------+--------------------+----------+
|            features|Survived|       rawPrediction|         probability|prediction|
+--------------------+--------+---------

## How to evaluate the model?
- using a *confusion matrix* https://en.wikipedia.org/wiki/Confusion_matrix

In [14]:
print(i_log_reg_model.summary.accuracy)
print(i_log_reg_model.summary.precisionByLabel)
print(i_log_reg_model.summary.recallByLabel)
print(i_log_reg_model.summary.fMeasureByLabel())


0.7923076923076923
[0.68, 0.8625]
[0.7555555555555555, 0.8117647058823529]
[0.7157894736842104, 0.8363636363636364]


In [15]:
results = i_log_reg_model.evaluate(i_df_test)

In [16]:
type(results)

pyspark.ml.classification.BinaryLogisticRegressionSummary

In [17]:
print(results.accuracy)
print(results.precisionByLabel)
print(results.recallByLabel)

0.7358490566037735
[0.5238095238095238, 0.875]
[0.7333333333333333, 0.7368421052631579]


In [18]:
predictions = i_log_reg_model.transform(i_df_test)

In [19]:
predictions.show(5)

+--------------------+--------+--------------------+--------------------+----------+
|            features|Survived|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+----------+
|[1.0,35.0,1.0,0.0...|       1|[-3.2050846336377...|[0.03897482675231...|       1.0|
|(5,[1,4],[54.0,51...|       0|[1.11192229525615...|[0.75248731297708...|       0.0|
|[1.0,4.0,1.0,1.0,...|       1|[-4.1837968702347...|[0.01501174429455...|       1.0|
|(5,[1,4],[34.0,13...|       1|[0.20081062040338...|[0.55003463118061...|       0.0|
|[0.0,19.0,3.0,2.0...|       0|[-0.1983626356232...|[0.45057131048745...|       1.0|
+--------------------+--------+--------------------+--------------------+----------+
only showing top 5 rows



## Evaluators

### Binary Classification Evaluator
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.BinaryClassificationEvaluator.html

#### Metrics
- areaUnderROC (area under ROC Curve) -> AUC
- areaUnderPR (area under Precission/Recall) -> AUPR

AUC provides an aggregate measure of performance across all possible classification thresholds. One way of interpreting AUC is as the probability that the model ranks a random positive example more highly than a random negative example.

One axis of ROC and PR curves is the same, that is TPR: how many positive cases have been classified correctly out of all positive cases in the data.

The other axis is different. ROC uses FPR, which is how many mistakenly declared positives out of all negatives in the data. PR curve uses precision: how many true positives out of all that have been predicted as positives. So the base of the second axis is different. ROC uses what's in the data, PR uses what's in the prediction as a basis.


In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [21]:
bin_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [22]:
roc = bin_eval.evaluate(predictions)
print(roc)

0.7350877192982457


In [23]:
auPR = bin_eval.evaluate(predictions, {bin_eval.metricName: "areaUnderPR"})
print(auPR)

0.854208043694141


## String Indexer vs. OneHotEncoder
### Comparing the two techniques for categorical variables 

In [24]:
def experimentLogReg(train_df, test_df, labelCol='label'):
    log_reg = LogisticRegression(labelCol=labelCol)
    bin_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol=labelCol)
    model = log_reg.fit(train_df)

    results = model.evaluate(test_df)
    
    print("Accuracy: ", results.accuracy)
    print("Precission by Label: ", results.precisionByLabel)
    print("Recall by Label: ", results.recallByLabel)
    
    pred = model.transform(test_df)
    roc = bin_eval.evaluate(pred)
    print("Area under ROC curve", roc)
    auPR = bin_eval.evaluate(predictions, {bin_eval.metricName: "areaUnderPR"})
    print("Area under PR curve", auPR)

In [25]:
experimentLogReg(i_df_train, i_df_test, labelCol='Survived')
experimentLogReg(e_df_train, e_df_test, labelCol='Survived')

Accuracy:  0.7358490566037735
Precission by Label:  [0.5238095238095238, 0.875]
Recall by Label:  [0.7333333333333333, 0.7368421052631579]
Area under ROC curve 0.7350877192982457
Area under PR curve 0.854208043694141
Accuracy:  0.7358490566037735
Precission by Label:  [0.5238095238095238, 0.875]
Recall by Label:  [0.7333333333333333, 0.7368421052631579]
Area under ROC curve 0.7350877192982457
Area under PR curve 0.854208043694141


In [26]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import NaiveBayes

def experiment_classification(train_df, test_df, labelCol='label'):
    log_reg = LogisticRegression(labelCol=labelCol)
    bayes = NaiveBayes(labelCol=labelCol)
    svm = LinearSVC(labelCol=labelCol)
    alg_names = ['Logistic Regression', 'Naive Bayes', 'SVM']
    algs = [log_reg, bayes, svm]

    bin_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol=labelCol)
    for i in range(len(algs)):
        alg = algs[i]
        print(alg_names[i], ":")
        
        model = alg.fit(train_df)
        
        pred = model.transform(test_df)
        roc = bin_eval.evaluate(pred)
        print("Area under ROC curve", roc)
        auPR = bin_eval.evaluate(predictions, {bin_eval.metricName: "areaUnderPR"})
        print("Area under PR curve", auPR)

In [27]:
experiment_classification(i_df_train, i_df_test, labelCol='Survived')

Logistic Regression :
Area under ROC curve 0.7350877192982457
Area under PR curve 0.854208043694141
Naive Bayes :
Area under ROC curve 0.662280701754386
Area under PR curve 0.854208043694141
SVM :
Area under ROC curve 0.7228070175438597
Area under PR curve 0.854208043694141


In [28]:
experiment_classification(e_df_train, e_df_test, labelCol='Survived')

Logistic Regression :
Area under ROC curve 0.7350877192982457
Area under PR curve 0.854208043694141
Naive Bayes :
Area under ROC curve 0.6228070175438597
Area under PR curve 0.854208043694141
SVM :
Area under ROC curve 0.7228070175438597
Area under PR curve 0.854208043694141
