# Overview
This notebook developes a Spam message identification model using pyspark in an AWS Sagemaker evironment. The dataset for this model is the UCI SMS Spam Collection Data Set found [here](https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection). While this dataset is small, the problem this notebook addresses, Spam messages, is a problem that encompasses large enough amounts of data to be a task suited for the Spark platform. 

# Import

## Packages / Libraries

In [None]:
import re
import os
import boto3
from functools import reduce
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import itertools

import sagemaker
import sagemaker_pyspark
from sagemaker import get_execution_role

from pyspark import SparkContext, SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,\
                                  BinaryClassificationEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, \
                               HashingTF, IDF
from pyspark.ml.linalg import VectorUDT
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, DoubleType,\
                              IntegerType, StringType

from sagemaker_pyspark import RandomNamePolicyFactory, IAMRole,\
                              EndpointCreationPolicy, SageMakerModel,\
                              SageMakerResourceCleanup
from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator,\
                                         PCASageMakerEstimator
from sagemaker_pyspark.transformation.serializers \
     import ProtobufRequestRowSerializer
from sagemaker_pyspark.transformation.serializers.serializers \
     import LibSVMRequestRowSerializer

from sklearn.metrics import confusion_matrix, roc_curve, auc

# Set Seed for Random Actions
seed = 5590

## Setup AWS and Spark
The following code blocks set up the global values and settings for AWS and Spark parameters.

### AWS

In [None]:
role = get_execution_role()
bucket = "dsba-6190-project3-spark"
file_name = "spam.csv"
session = sagemaker.Session()
region = boto3.Session().region_name

### Spark

In [None]:
# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath)\
    .master("local")\
    .appName("Spam Filter")\
    .getOrCreate()
    
spark

## Load Data
When we try to load the data using default settings, the import adds headers and includes several empty columns. To avoid this, we define the schema of the data before we import.

In [None]:
# Define Known Schema
schema = StructType([
    StructField("class", StringType()),
    StructField("sms", StringType())
])

# Import CSV
df = spark.read\
          .schema(schema)\
          .option("header", "true")\
          .csv('s3a://{}/{}'.format(bucket, file_name))

df_num_col =  len(df.columns)
df_num_rows = df.count()

# Inspect Import
df.show(5)
print()
print("Schema")
df.printSchema()
print()
print("Shape - Rows x Columns")
print(df_num_rows,"x", df_num_col)

#### Check Null Values
We need to check and see if our data contains any null values. 

In [None]:
df.where(reduce(lambda x, y: x | y, (f.col(x).isNull() \
                                     for x in df.columns))).show()

It appears one of the rows contains null values, and the class label is corrupted as well. We can go ahead and drop this row.

In [None]:
df = df.dropna()

Lets now inspect the dataframe now that the null row has been dropped.

In [None]:
df_num_col =  len(df.columns)
df_num_rows = df.count()

# Inspect Import
df.show(5)
print()
print("Schema")
df.printSchema()
print()
print("Shape - Rows x Columns")
print(df_num_rows,"x", df_num_col)

Re-check null values.

In [None]:
df.where(reduce(lambda x, y: x | y, (f.col(x).isNull() \
                                     for x in df.columns))).show()

# EDA
We'll do some very basic EDA here. First we'll look at the breakdown of our target variables. 

In [None]:
df.groupBy("class").count().show()

Clearly there is an error with the class label on one value. Lets see what the message is associated with **ham"""**.

In [None]:
df.where(f.col("class") == 'ham"""').show(truncate = False)

This doesn't seem out of the oridinary, and appears to be a **ham** sms message. I am going to change **ham"""** to **ham**.

In [None]:
df = df.withColumn("class", f.when(f.col("class") == 'ham"""' , 'ham').
                     otherwise(f.col("class")))

Lets verify the change occurred.

df.groupBy("class").count().show()

# Preprocess
Before we perform any analysis on the data we will need to perform three major steps:

1. Text Normalization
2. Tokenization 
3. TF-IDF Transformation

With Text Normalization, we will process the raw text to provide a quality input for our model. These actions used the blog post [**Spam classification using Spark’s DataFrames, ML and Zeppelin (Part 1)**](https://blog.codecentric.de/en/2016/06/spam-classification-using-sparks-dataframes-ml-zeppelin-part-1/) by Daniel Pape, accessed on 4/16/2020, as guidance for some of these actions. This blog post provided a good framework particularly for handling types of text you find in an SMS message, such as emoticons.

Once the raw text is normalized, we can then tokenize and convert the text into a form that can be used by the analytical model.

## Text Normalization
To normalize the text, there are several steps we plan on taking:

1. Convert all text to lowercase
2. Convert all numbers to the text **_" normalized_number "_**
3. Convert all emoticons to the text **_" normalized_emoticon "_**
4. Convert all currency symbols to the text **_" normalized_currency_symbol "_**
5. Convert all links to the text **_" normalized_url "_**
6. Convert all email addresses to the text **_" normalized_email "_**
7. Convert all diamond/question mark symbols to the text **_" normalized_doamond_symbol "_**
8. Remove HTML characters
9. Remove punctuation

### Convert Text to Lower Case

In [None]:
df_norm = df.select("class","sms", f.lower(f.col("sms")).alias("sms_norm"))
df_norm.show(5)

### Normalize Symbols and Objects
To normalize the symbols and objects in the data, we will need to define user functions and employ replacement with regex tools.

To enable a method to cycle through the dataframe and make all the necessary replacements, I am going to define a dictionary, where each key is the expression that will be used to find what needs to be replaces, and the value is the repalcement string.

The regex for the emoticons came from [here](https://www.regextester.com/96995).

The remaining regex expressions came from [here](https://github.com/daniel-pape/spark-logistic-regression-spam-sms/blob/master/src/main/scala/preprocessing/LineCleaner.scala)

In [None]:
html_list = ["&lt;", "&gt;", "&amp;", "&cent;", "&pound;", "&yen;", "&euro;", "&copy;", "&reg;"]

regex_url = "\\w+(\\.|-)*\\w+@.*\\.(com|de|uk)"
regex_emoticon = ":\)|:-\)|:\(|:-\(|;\);-\)|:-O|8-|:P|:D|:\||:S|:\$|:@|8o\||\+o\(|\(H\)|\(C\)|\(\?\)"
regex_number = "\\d+"
regex_punctuation ="[\\.\\,\\:\\-\\!\\?\\n\\t,\\%\\#\\*\\|\\=\\(\\)\\\"\\>\\<\\/]"
regex_currency = "[\\$\\€\\£]"
regex_url =  "(http://|https://)?www\\.\\w+?\\.(de|com|co.uk)"
regex_diamond_question = "�"
regex_html = "|".join(html_list)

dict_norm = {
    regex_emoticon : " normalized_emoticon ",
    regex_url : " normalized_emailaddress ",
    regex_number : " normalized_number ",
    regex_punctuation : " ",
    regex_currency : " normalized_currency_symbol ",
    regex_url: " normalized_url ",
    regex_diamond_question : " normalized_doamond_symbol ",
    regex_html : " "
}

for key, value in dict_norm.items():
    df_norm = df_norm.withColumn("sms_norm", f.regexp_replace(f.col("sms_norm"), key, value))

    df_norm.select('class','sms_norm').show(5, truncate = False)

df = df_norm.dropna()

print("Shape - Rows x Columns")
print(df.count(),"x", len(df.columns))

We're going to check again for null values, to ensure the conversion hasn't created any new null values.

In [None]:
df = df_norm
df.where(reduce(lambda x, y: x | y, (f.col(x).isNull() \
                                     for x in df.columns))).show()

## Convert Class to Binary
We need to convert our spam/ham class to a binary. We also need to conert the column type to int.

In [None]:
df_norm = df_norm.withColumn("class", f.when(f.col("class") == "spam" , 1).
                             when(f.col("class") == "ham" , 0).
                             otherwise(f.col("class")))

df_norm = df_norm.withColumn("class", f.col('class').cast(IntegerType()))
df_norm.show(5)
df_norm.printSchema()

## Text Tokenization and Transformation
We will tokenize the text using a pyspark pipeline. First, we must initialize the pipeline components. For this pipeline, we will user the following estimators:

1. Tokenizer
2. Stop Words Remover
3. Term Frequency Hashing

### Establish Pipeline

In [None]:
tokenizer = Tokenizer(inputCol="sms_norm", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_filtered")
hashingTF = HashingTF(inputCol="tokens", outputCol="features_tf", 
                      numFeatures=1000)

pipeline_text = Pipeline(stages=[tokenizer, remover, hashingTF])

### Execute Pipeline on Complete Dataset

In [None]:
pipeline_text_fit = pipeline_text.fit(df_norm)
df_pipeline = pipeline_text_fit.transform(df_norm)
df_pipeline.show(5)

## Train / Test Split
Now that we have performed all the possible actions that should be performed on the complete dataset, we split the data into train/test.

In [None]:
split_train = 0.8
train, test = df_pipeline.randomSplit([split_train, (1-split_train)], seed=seed)

### Verify Split

In [None]:
print("Shape - Train")
print((train.count(), len(train.columns)))
print()
print("Shape - Test")
print((test.count(), len(test.columns)))

## Inverse Document Frequency Calculation
To calculate the Term Frequency - Inverse Document Frequency values for the corpus, we need to train the IDF estmator on the **train** data. Then we apply the trained estimator to the train and test set. 

In [None]:
# Initialize IDF Estimator
idf = IDF(minDocFreq=2, inputCol="features_tf", outputCol="features_tfidf")

# Train IDF Estimator to Term Frequency Data
idfModel = idf.fit(train)

# Re-Scale Term Frequency Data 
train = idfModel.transform(train)
test = idfModel.transform(test)

# Inspect
train.show(5)

## Isolate Data
Moving forward we only need the tf-idf features and the class label. We will relabel them features and label to be consistent with the XGBoost Estmator input labels.

In [None]:
df_train = train.select(f.col("class").alias("label"), f.col("features_tfidf").
                        alias("features"))

df_test = test.select(f.col("class").alias("label"), f.col("features_tfidf").
                      alias("features"))

# Train
We will be training our data on the Sagemaker Pyspark XGBoost algorithm. One tricky part about using this algorithm is that it only takes **LIBSVM** format data. Unfortunatley, that is not the current format of our data. In order for the algoritm to accept our data as an input, we need to do three things

1. Define the correct shema
2. Convert data to match the correct schema
3. Include **LibSVMRequestRowSerializer** as a parameter when initializing the XGBoost estimator.

## Define the Schema
In order to be accepted as a **LIBSVM** type data, the schema of our pyspark DataFrame must be a specific schema. The schema can be seen buried in the source code of the **Verify Schema** call in the **LibSVMRelation.scala** utility, see [here](https://github.com/apache/spark/blob/930b90a84871e2504b57ed50efa7b8bb52d3ba44/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala#L79) (accessed 4/17/2020). Based on this function, our data needs to be in two columns, one column a **DoubleType()** (which will be our label column) and the other column a **VectorUDT()** type (which will be our Sparse Vector features column). 

With these requirements, we define a general schema.

In [None]:
schema = StructType([
    StructField("label", DoubleType()),
    StructField("features", VectorUDT())
])
print(schema)

## Convert Data to Match Schema
We also convert the data types of our current train and test data sets to match this data type.

In [None]:
#Train
df_train = df_train.withColumn("label", f.col('label').cast(DoubleType()))
df_train = df_train.withColumn("features", f.col('features').cast(VectorUDT()))
print("Schema - Train")
df_train.printSchema()
print()

# Test
df_test = df_test.withColumn("label", f.col('label').cast(DoubleType()))
df_test = df_test.withColumn("features", f.col('features').cast(VectorUDT()))
print("Schema - Test")
df_test.printSchema()

## Initialize Model
We will now initialize the XGBoost Estimator. A few notes:

* In order for the estimator to accept our input data as **LIBSVM**, we need to use the parameter **requestRowSerializer**. We define this parameter as **LibSVMRequestRowSerializer**, identifying the feature column, label column, and schema. 
* This is personal preference, but I like adding name to the mode we're creating. It makes it easier to find when you're looking up past trained models. So we add a **namPolicyFactory** value. But be careful. If you want to deploy your model as an endpoint, the maximum number of characters the model name can have is 63. This means the prefix you add to the front of your model can only be about 10 characters. Sagemaker will tack-on the rest of the model tag. If you exceed 63 characters, deploying your endpoint will fail.

In [None]:
xgboost_estimator = XGBoostSageMakerEstimator(
  sagemakerRole = IAMRole(role),
  requestRowSerializer=LibSVMRequestRowSerializer(schema=schema,
                                                 featuresColumnName="features",
                                                 labelColumnName="label"),
  trainingInstanceType = "ml.m4.xlarge",
  trainingInstanceCount = 1,
  endpointInstanceType = "ml.m4.xlarge",
  endpointInitialInstanceCount = 1,
  namePolicyFactory=RandomNamePolicyFactory("spam-xgb-"),
  endpointCreationPolicy = EndpointCreationPolicy.CREATE_ON_TRANSFORM
)

## Set Hyperparameters
After initializing the model, we set the hyperparameters. This problem is a binary classification problem, so we'll et the objective to **binary:logistic** and evaluate based on the **AUC** score.

In [None]:
xgboost_estimator.setNumRound(15)
xgboost_estimator.setObjective("binary:logistic")
xgboost_estimator.setEvalMetric("auc")
xgboost_estimator.setSeed(seed)

## Train
With everything set, we can now train the mode.

In [None]:
model = xgboost_estimator.fit(df_train)

# Evaluate
## Transform
First, we generate predictons based off the test set.

In [None]:
predictions = model.transform(df_test)

Then, we will assign predicted labels, using 0.5 as a threshold.

We will also create re-labeled columns with spam and ham. This will be primarily for downstream visuals.

In [None]:
predictions = predictions.withColumn("prediction_binary", \
                                     f.when(f.col("prediction") > 0.5 , 1.0).
                                     otherwise(0.0))

predictions = predictions.withColumn("prediction_spam", \
                                     f.when(f.col("prediction_binary") == 1 ,\
                                            "spam").otherwise("ham"))

predictions = predictions.withColumn("label_spam",\
                                     f.when(f.col("label") == 1 , "spam").
                                     otherwise("ham"))
predictions.show(5)

Lets take a look at the predicted distribution.

In [None]:
predictions.groupBy("prediction_spam").count().show()

### Scores
Now we can look at some of the classification scores. Note that we are using both the **MulticlassClassificationEvaluator** and **BinaryClassificationEvaluator** objects to generate the metrics we want. 

In [None]:
def output_scores(predictions):
    digit_format  = ": {:.4f}"
    
    ### Multi-Class Evaluator
    dict_metric_multi = {"Accuracy" : "accuracy", 
                         "Precision - Weighted" : "weightedPrecision", 
                         "Recall - Weighted" : "weightedRecall",
                         "F1 Score": "f1"}

    for key, value in dict_metric_multi.items():
        evaluator =  MulticlassClassificationEvaluator(labelCol="label", 
                                                   predictionCol=\
                                                       "prediction_binary", 
                                                   metricName=value)

        metric = evaluator.evaluate(predictions)

        print(key + digit_format.format(metric))   
    
    # Binary Class Evaluator
    dict_metric_bin = {"AUC Score" : "areaUnderROC"}
    for key, value in dict_metric_bin.items():
    
        evaluator=BinaryClassificationEvaluator(rawPredictionCol="prediction",
                                                  labelCol="label", 
                                                  metricName=value)
        
        metric = evaluator.evaluate(predictions)
        print(key + digit_format.format(metric))

output_scores(predictions)

## ROC Curve

In [None]:
test_label = predictions.select('label').toPandas()
test_pred = predictions.select('prediction').toPandas()

In [None]:
fpr, tpr, thresholds = roc_curve(test_label, test_pred)
roc_auc

In [None]:
roc_auc = auc(fpr, tpr)

In [None]:
plt.rc('font', size=19.5) 
plt.figure(figsize=[7,7])
plt.plot(fpr, tpr, label='ROC curve (area = %0.3f)' % (roc_auc))
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")

## Confusion Matrix
We are going to visualize the confusion matrix using the method outlined [here](https://runawayhorse001.github.io/LearningApacheSpark/classification.html#demo).
### Confusion Matrix Plotting Function

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        #print("Normalized confusion matrix")
    else:
        print()
        #print('Confusion matrix, without normalization')

    #print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.3f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

### Define Class Names

In [None]:
# Create List of Class Names
class_label = predictions.select("label_spam").groupBy("label_spam")\
    .count().sort('count', ascending=False).toPandas()
class_names = class_label["label_spam"].to_list()
class_names
#class_names = list(map(str, class_names))

### Generate Raw Confusion Matrix

In [None]:
# Convert Labels to Pandas Dataframe
y_true = predictions.select("label_spam")
y_true = y_true.toPandas()

# Convert Predictions to Pandas Dataframe
y_pred = predictions.select("prediction_spam")
y_pred = y_pred.toPandas()

cm = confusion_matrix(y_true, y_pred, labels=class_names)

### Plot

In [None]:
plt.figure(figsize=[7,7])
plot_confusion_matrix(cm, 
                      classes=class_names,
                      normalize=True,
                      title='Confusion Matrix, \nWith Normalization')
plt.show()

# Clean-Up
After everything is done, we do not wan't to leave resources needlessly running, costing us money. So, we shut everything down.

In [None]:
resource_cleanup = SageMakerResourceCleanup(model.sagemakerClient)
resource_cleanup.deleteResources(model.getCreatedResources())