In [71]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [72]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SMS_SpamDetection').getOrCreate()

In [73]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Data Import

In [74]:
path = '/content/drive/MyDrive/Colab Notebooks/230109 - SMS_SpamDetection/spam.csv'

In [75]:
data_original = spark.read.csv(path, inferSchema=True,header=True)

In [76]:
data_original.printSchema()

root
 |-- v1: string (nullable = true)
 |-- v2: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



In [77]:
for keys,item in zip(data_original.head(5)[3].asDict().keys(),
                     data_original.head(5)[3].asDict().values()):
  print(keys+' : '+str(item))

v1 : ham
v2 : U dun say so early hor... U c already then say...
_c2 : None
_c3 : None
_c4 : None


In [78]:
from pyspark.sql.functions import col,isnan, when, count

data_original.select(
    [
        count(
            when(
                isnan(c) | col(c).isNull(), c
                 )
            ).alias(c) for c in data_original.columns
     ]
     ).show()

+---+---+----+----+----+
| v1| v2| _c2| _c3| _c4|
+---+---+----+----+----+
|  0|  1|5525|5562|5568|
+---+---+----+----+----+



The cols _c2,_c3 and _c4 have no information and can be dropped.

For clarity, columns will be renamed. Also, a columns with the length of the message will be created, rows with null values will be dropped and the filter will guarantee that the class is consistent.

In [79]:
from pyspark.sql.functions import length,mean

data = data_original.select('v1','v2')\
        .withColumnRenamed('v1','class')\
        .withColumn('length',length(data_original['v2']))\
        .withColumnRenamed('v2','message')\
        .na.drop()\
        .filter(
                (data_original['v1'] == 'ham') |\
                (data_original['v1'] == 'spam')
                )

In [80]:
data.show(2)

+-----+--------------------+------+
|class|             message|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
+-----+--------------------+------+
only showing top 2 rows



In [81]:
data.groupBy('class')\
    .agg(
        count('class'),
         mean('length')
         ).show()

+-----+------------+------------------+
|class|count(class)|       avg(length)|
+-----+------------+------------------+
|  ham|        4825| 71.07357512953368|
| spam|         747|138.45917001338688|
+-----+------------+------------------+



It looks like ham messages tend to be shorter, on average, than spam messages. Therefore, the interest in the message length is justified and can help the model to differentiate between good message and SPAM.

# Spark Modeling

In [82]:
from pyspark.ml.feature import (RegexTokenizer,StopWordsRemover, 
                                CountVectorizer,IDF,
                                StringIndexer,VectorAssembler)

The classes must be numerical values. So, we will use StringIndexer.

In [83]:
indexer = StringIndexer(
                        inputCol="class",
                        outputCol="label",
                        stringOrderType="frequencyDesc"
                        )

index_data = indexer.fit(data).transform(data)

index_data.groupBy('label')\
    .agg(
        count('label'),
         mean('length')
         ).show()

+-----+------------+------------------+
|label|count(label)|       avg(length)|
+-----+------------+------------------+
|  0.0|        4825| 71.07357512953368|
|  1.0|         747|138.45917001338688|
+-----+------------+------------------+



We have to transform the col message into a vector column containing the words. Spaces have to be removed.

In [84]:
token = RegexTokenizer(
                        inputCol="message",
                        outputCol="words",
                        pattern="\\W"
                        )
token_data = token.transform(index_data)

token_data.show(5)

+-----+--------------------+------+-----+--------------------+
|class|             message|length|label|               words|
+-----+--------------------+------+-----+--------------------+
|  ham|Go until jurong p...|   111|  0.0|[go, until, juron...|
|  ham|Ok lar... Joking ...|    29|  0.0|[ok, lar, joking,...|
| spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|
|  ham|U dun say so earl...|    49|  0.0|[u, dun, say, so,...|
|  ham|Nah I don't think...|    61|  0.0|[nah, i, don, t, ...|
+-----+--------------------+------+-----+--------------------+
only showing top 5 rows



Some words do not convey information. They are called "stop words". Spark has a fuction to remove them.

In [85]:
stop = StopWordsRemover(inputCol='words', outputCol='n_words')

stop_data = stop.transform(token_data)

stop_data.select('words','n_words').show(5)

+--------------------+--------------------+
|               words|             n_words|
+--------------------+--------------------+
|[go, until, juron...|[go, jurong, poin...|
|[ok, lar, joking,...|[ok, lar, joking,...|
|[free, entry, in,...|[free, entry, 2, ...|
|[u, dun, say, so,...|[u, dun, say, ear...|
|[nah, i, don, t, ...|[nah, think, goes...|
+--------------------+--------------------+
only showing top 5 rows



Machine Learning techniques usually only accept numeric inputs. CountVectorizer is a Spark function that creates a numeric dictionary that relates all the words contained in the messages and their relative (to the row) frequencies.   

In [86]:
countvec = CountVectorizer(inputCol='n_words',outputCol='cvec_words')

countvec_data = countvec.fit(stop_data).transform(stop_data)

countvec_data.select('cvec_words').show(1,truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------+
|cvec_words                                                                                                                                |
+------------------------------------------------------------------------------------------------------------------------------------------+
|(8537,[11,16,37,62,69,79,249,544,632,770,1252,1295,1308,2873,4668,4907],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
+------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



TF - IDF (https://en.wikipedia.org/wiki/Tf%E2%80%93idf)

In [87]:
idf_model = IDF(inputCol='cvec_words',outputCol='idf_words')

idf_data = idf_model.fit(countvec_data).transform(countvec_data)

idf_data.select('idf_words').show(1,truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|idf_words                                                                                                                                                                                                                                                                                                                                                                     |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Finally, the data has to be framed in a format acceptable for Spark. All the features or dependent variables must be put in a single column. Each row contains a vector that condenses all the features in a single element. Usually, this columns is called 'features'. This is done easily with VectorAssmebler.

Another column with the labels in numeric format is necessary. The custom is to call it 'label'.

In [88]:
assembler = VectorAssembler(inputCols=['idf_words','length'],outputCol='features')

treated_data = assembler.transform(idf_data)

treated_data.select('features','label').show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(8538,[11,16,37,6...|  0.0|
|(8538,[0,9,244,36...|  0.0|
|(8538,[2,10,23,24...|  1.0|
|(8538,[0,57,85,86...|  0.0|
|(8538,[53,136,366...|  0.0|
|(8538,[9,15,21,26...|  1.0|
|(8538,[15,132,286...|  0.0|
|(8538,[149,157,31...|  0.0|
|(8538,[1,64,82,14...|  1.0|
|(8538,[0,1,10,31,...|  1.0|
|(8538,[3,22,29,33...|  0.0|
|(8538,[6,17,21,24...|  1.0|
|(8538,[10,24,26,5...|  1.0|
|(8538,[45,77,84,1...|  0.0|
|(8538,[479,677,85...|  0.0|
|(8538,[24,37,80,1...|  1.0|
|(8538,[3,41,63,27...|  0.0|
|(8538,[0,2,71,73,...|  0.0|
|(8538,[0,72,91,13...|  0.0|
|(8538,[5,24,26,42...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [89]:
treated_data.select('features')

DataFrame[features: vector]

## Pipeline

All the processes performed in the previous section are usually done in a condensed form with a pipeline. The main advatange is the reuse of all the steps in a single command. In this example, the model is left out of the pipeline, but it could be inserted here as well.

In [90]:
from pyspark.ml import Pipeline

In [91]:
pipe_prep_data = Pipeline(stages=[indexer, token, stop, countvec,
                                  idf_model, assembler])

In [92]:
prep_data = pipe_prep_data.fit(data).transform(data)

In [93]:
prep_data.select('features','label').show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(8538,[11,16,37,6...|  0.0|
|(8538,[0,9,244,36...|  0.0|
|(8538,[2,10,23,24...|  1.0|
|(8538,[0,57,85,86...|  0.0|
|(8538,[53,136,366...|  0.0|
|(8538,[9,15,21,26...|  1.0|
|(8538,[15,132,286...|  0.0|
|(8538,[149,157,31...|  0.0|
|(8538,[1,64,82,14...|  1.0|
|(8538,[0,1,10,31,...|  1.0|
|(8538,[3,22,29,33...|  0.0|
|(8538,[6,17,21,24...|  1.0|
|(8538,[10,24,26,5...|  1.0|
|(8538,[45,77,84,1...|  0.0|
|(8538,[479,677,85...|  0.0|
|(8538,[24,37,80,1...|  1.0|
|(8538,[3,41,63,27...|  0.0|
|(8538,[0,2,71,73,...|  0.0|
|(8538,[0,72,91,13...|  0.0|
|(8538,[5,24,26,42...|  1.0|
+--------------------+-----+
only showing top 20 rows



One can compare treated_data and prep_data and conclude that they are exactly the same.

# Machine Learning Modelling

## Train and Test Split

The most common way to create and test a model is to split the data in train and test dataframes.

In [94]:
train_data,test_data = prep_data.randomSplit([0.8,0.2],seed = 1)

In [112]:
train_data.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 3832|
|  1.0|  594|
+-----+-----+



In [96]:
test_data.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|  993|
|  1.0|  153|
+-----+-----+



We can remark that both datasets maintained the same level of label imbalance.

## Naive Bayes

Naive Bayes is classically used for Natural Language Classification. It relies on the estimation of conditional probabilities. Intuitevely, it checks on how much the probability of an element to be one class or the other increases due to the presence of each element in the message. Unfortunately this probabilities can only be estimated and not calculated accurately.

This approach fits well with the problem at hand. For example, suppose the word "order" or "buy" appears in a message. We can not know for sure that it is a SPAM just based on that, but we can say it gets more probable. Combinig this with other words and their conditional probabilities is what the Naive Bayes attempts to do.

In [97]:
from pyspark.ml.classification import NaiveBayes

In [98]:
nB = NaiveBayes(featuresCol='features',
                      labelCol='label',
                      modelType='multinomial')

In [99]:
nB_model = nB.fit(train_data)

In [100]:
test_results = nB_model.transform(test_data)
test_results.columns

['class',
 'message',
 'length',
 'label',
 'words',
 'n_words',
 'cvec_words',
 'idf_words',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [101]:
test_results.select('label','prediction','rawPrediction','probability').show()

+-----+----------+--------------------+--------------------+
|label|prediction|       rawPrediction|         probability|
+-----+----------+--------------------+--------------------+
|  0.0|       0.0|[-859.65131284924...|[1.0,4.3097634138...|
|  0.0|       0.0|[-4650.7280415482...|[1.0,7.7828248558...|
|  0.0|       0.0|[-4650.7280415482...|[1.0,7.7828248558...|
|  0.0|       0.0|[-1203.6600955646...|[1.0,1.2878081462...|
|  0.0|       0.0|[-1411.3269760801...|[1.0,1.3678122809...|
|  0.0|       0.0|[-1220.8401401176...|[1.0,1.3227880678...|
|  0.0|       0.0|[-480.14388362531...|[1.0,6.1770067477...|
|  0.0|       0.0|[-292.17748613866...|[1.0,1.3484351044...|
|  0.0|       0.0|[-426.76108379898...|[1.0,3.6604115341...|
|  0.0|       0.0|[-1905.6818024263...|[1.0,3.6097431012...|
|  0.0|       0.0|[-1674.1179745284...|[0.95938665094168...|
|  0.0|       0.0|[-227.60994623319...|[0.99999978380961...|
|  0.0|       0.0|[-463.80407128968...|[1.0,1.8051991557...|
|  0.0|       0.0|[-376.

In [115]:
def ConfusionMatrixStats(test_results): 

    #True Positives
    TP = test_results.filter((test_results['prediction']==1) & (test_results['label'] == 1)).count()
    #True Negatives
    TN = test_results.filter((test_results['prediction']==0) & (test_results['label'] == 0)).count()
    #False Positives
    FP = test_results.filter((test_results['prediction']==1) & (test_results['label'] == 0)).count()
    #False Negatives
    FN = test_results.filter((test_results['prediction']==0) & (test_results['label'] == 1)).count()

    mat = [[TP,FN],[FP,TN]]
    print('-----------------------------------------------')
    print('TP  =   {}         FN  =   {}   '.format(TP,FN))
    print('FP  =   {}         TN  =   {}   '.format(FP,TN))

    accuracy = (TP+TN)/(TP+TN+FP+FN)
    print('The accuracy for the test dataset is {}.'.format(accuracy))
    #recall or true positive rate
    recall = TP/(TP+FN)
    print('The recall for the test dataset is {}.'.format(recall))
    #negative recall or true negative rate
    selec = TN/(FP+TN)
    print('The selectivity for the test dataset is {}.'.format(selec))
    #precision or positive predicted value
    prec = TP/(FP+TP)
    print('The precision for the test dataset is {}.'.format(prec))


In [116]:
ConfusionMatrixStats(test_results)

-----------------------------------------------
TP  =   151         FN  =   2   
FP  =   49         TN  =   944   
The accuracy for the test dataset is 0.9554973821989529.
The recall for the test dataset is 0.9869281045751634.
The selectivity for the test dataset is 0.9506545820745217.
The precision for the test dataset is 0.755.


## Random Forest

Let's compare the Naive Bayes model with a Random Forest.

In [104]:
from pyspark.ml.classification import RandomForestClassifier as RFC

In [117]:
rfc_model = RFC(
                featuresCol='features',
                labelCol='label',
                maxDepth=10,
                numTrees=200,
                impurity='entropy')

rfc_test_results = rfc_model.fit(train_data).transform(test_data)
ConfusionMatrixStats(rfc_test_results)

-----------------------------------------------
TP  =   26         FN  =   127   
FP  =   0         TN  =   993   
The accuracy for the test dataset is 0.8891797556719022.
The recall for the test dataset is 0.16993464052287582.
The selectivity for the test dataset is 1.0.
The precision for the test dataset is 1.0.


The Random Forest model was much inferior to the Naive Bayes. It learned just the Ham class and was not able to learn the Spam class, since most of positives were missed. It almost guessed every instance to be Ham. The threshold to determine an instance as SPAM became too high.