In [1]:
from pyspark.sql import SparkSession

In [2]:
sparkSession = SparkSession.builder.appName('SpamDetectionSession').getOrCreate()
sparkSession

In [3]:
df1 = sparkSession.read.csv('../resources/datasets/spam_dataset.csv', header=True)
df1.show()

+--------+--------------------+
|Category|             Message|
+--------+--------------------+
|     ham|Go until jurong p...|
|     ham|Ok lar... Joking ...|
|    spam|Free entry in 2 a...|
|     ham|U dun say so earl...|
|     ham|Nah I don't think...|
|    spam|FreeMsg Hey there...|
|     ham|Even my brother i...|
|     ham|As per your reque...|
|    spam|WINNER!! As a val...|
|    spam|Had your mobile 1...|
|     ham|I'm gonna be home...|
|    spam|SIX chances to wi...|
|    spam|URGENT! You have ...|
|     ham|I've been searchi...|
|     ham|I HAVE A DATE ON ...|
|    spam|XXXMobileMovieClu...|
|     ham|Oh k...i'm watchi...|
|     ham|Eh u remember how...|
|     ham|Fine if thats th...|
|    spam|England v Macedon...|
+--------+--------------------+
only showing top 20 rows



In [4]:
from pyspark.sql.functions import length

df1 = df1.withColumn('length', length(df1['Message']))
df1.show()

+--------+--------------------+------+
|Category|             Message|length|
+--------+--------------------+------+
|     ham|Go until jurong p...|   111|
|     ham|Ok lar... Joking ...|    29|
|    spam|Free entry in 2 a...|   155|
|     ham|U dun say so earl...|    49|
|     ham|Nah I don't think...|    61|
|    spam|FreeMsg Hey there...|   147|
|     ham|Even my brother i...|    77|
|     ham|As per your reque...|   160|
|    spam|WINNER!! As a val...|   157|
|    spam|Had your mobile 1...|   154|
|     ham|I'm gonna be home...|   109|
|    spam|SIX chances to wi...|   136|
|    spam|URGENT! You have ...|   155|
|     ham|I've been searchi...|   196|
|     ham|I HAVE A DATE ON ...|    35|
|    spam|XXXMobileMovieClu...|   149|
|     ham|Oh k...i'm watchi...|    26|
|     ham|Eh u remember how...|    81|
|     ham|Fine if thats th...|    56|
|    spam|England v Macedon...|   155|
+--------+--------------------+------+
only showing top 20 rows



# Pre-processing phase
- Involves cleaning the data
- Generating the columns required for training the Model

In [5]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

tokenizer = Tokenizer(inputCol='Message', outputCol='tokens')
removedStopWords = StopWordsRemover(inputCol='tokens', outputCol='removed_stop_words')
countVectorizer = CountVectorizer(inputCol='removed_stop_words', outputCol='vectorized')
idf = IDF(inputCol='vectorized', outputCol='idf')
labels = StringIndexer(inputCol='Category', outputCol='label')

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

vAssembler = VectorAssembler(inputCols=['idf', 'length'], outputCol='features')

In [7]:
from pyspark.ml import Pipeline

inputFeatures = [tokenizer, removedStopWords, countVectorizer, idf, vAssembler, labels]

pipeline = Pipeline(stages = inputFeatures)
cleaner = pipeline.fit(df1)

cleaned_df = cleaner.transform(df1)
cleaned_df.show()

+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|Category|             Message|length|              tokens|  removed_stop_words|          vectorized|                 idf|            features|label|
+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|     ham|Go until jurong p...|   111|[go, until, juron...|[go, jurong, poin...|(13421,[7,10,32,6...|(13421,[7,10,32,6...|(13422,[7,10,32,6...|  0.0|
|     ham|Ok lar... Joking ...|    29|[ok, lar..., joki...|[ok, lar..., joki...|(13421,[0,24,296,...|(13421,[0,24,296,...|(13422,[0,24,296,...|  0.0|
|    spam|Free entry in 2 a...|   155|[free, entry, in,...|[free, entry, 2, ...|(13421,[2,13,19,2...|(13421,[2,13,19,2...|(13422,[2,13,19,2...|  1.0|
|     ham|U dun say so earl...|    49|[u, dun, say, so,...|[u, dun, say, ear...|(13421,[0,72,78,1...

# Training the model

In [8]:
from pyspark.ml.classification import NaiveBayes

df2 = cleaned_df.select(['label', 'features'])
(train, test) = df2.randomSplit([0.7, 0.3], seed=11)

nb = NaiveBayes()
model = nb.fit(train)

In [9]:
res = model.transform(test)
res.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13422,[0,1,5,15,...|[-1009.8904044217...|[1.0,6.5521301247...|       0.0|
|  0.0|(13422,[0,1,5,20,...|[-815.91608031702...|[1.0,3.4125044327...|       0.0|
|  0.0|(13422,[0,1,9,14,...|[-549.95711003174...|[1.0,1.0865324357...|       0.0|
|  0.0|(13422,[0,1,10,31...|[-875.28473229510...|[1.0,5.2375721078...|       0.0|
|  0.0|(13422,[0,1,12,34...|[-445.55485354728...|[1.0,2.0561759442...|       0.0|
|  0.0|(13422,[0,1,14,18...|[-1366.4093537066...|[1.0,8.4369525190...|       0.0|
|  0.0|(13422,[0,1,14,81...|[-690.13920610716...|[1.0,4.0505834473...|       0.0|
|  0.0|(13422,[0,1,15,20...|[-696.31193947636...|[1.0,3.7102911639...|       0.0|
|  0.0|(13422,[0,1,21,27...|[-769.42623970917...|[1.0,1.8812512980...|       0.0|
|  0.0|(13422,[0

# Calculating the accuracy

In [10]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator()
accuracy = evaluator.evaluate(res)

print("Accuracy = ", accuracy * 100)

Accuracy =  91.69316582517087


In [11]:
# closing the spark session
sparkSession.stop()