# Spam detection filter using Spark

There are several methods of spam detection. However, the most common ones are:

1. Content based: This method is usually used to create automatic filtering rules and to classify emails using machine learning approaches, such as Naïve Bayesian classification, Support Vector Machine, K Nearest Neighbor, Neural Networks. This method normally analyses words, the occurrence, and distributions of words and phrases in the content of emails and used then use generated rules to filter the incoming email spams.

2. Case based: This method is carried out in two steps. First, both spam and non-spam emails will be extracted. Then, a few steps of preprocessing, such as feature extraction, and selection, grouping of email data, and evaluating the process are going to be executed to transform the email using client interface. At this stage the data is classified into two vectors, thus machine learning can be used to detect the spam.

3. Heuristic or Rule based: This method filters spam based on pre-created rule. It evolves a large number of patterns which are usually regular expressions against a chosen message. Hence, any message score that surpasses a specific threshold is filtered as spam.

4. Previous likeness based: This approch is based on previous memory, for instance, a training data and its resemblance to the current email. If the email resembels the spam of the training data, then it will be classified as such.This approach uses the k-nearest neighbor (kNN) for filtering spam emails.

5. Adaptive spam filtering technique: This technique divids the incoming email corpse into groups based on the likelyhood persontage of similarity.
For further reading check: https://www.sciencedirect.com/science/article/pii/S2405844018353404

In this project the Natural Language Processing, NLP package of pyspark is emplimented to build a spam detection filter. The data used in this project are collected from volunteer customers in Singapore and from a UK spam reporting site. The classifier used for this purpose is known as Naive Bayes. 
It is a simple classifier based on Bayes theorm with the assumption of conditional independence between features given the value of class variation. Even though, it seems that Naive Bayes is an overly simplifed classifier, it works very well with real world problems such as spam filtering.
For further reference https://scikit-learn.org/stable/modules/naive_bayes.html

In [1]:
import findspark
findspark.init('/home/yohannes/spark-2.4.7-bin-hadoop2.7')

In [2]:
%config Completer.use_jedi=False 

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('spam_detector').getOrCreate()

In [5]:
data = spark.read.csv('/home/yohannes/One_Drive/Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Natural_Language_Processing/smsspamcollection/SMSSpamCollection',inferSchema=True,sep='\t')

### Data inspection

In [6]:
data.describe().show()

+-------+----+--------------------+
|summary| _c0|                 _c1|
+-------+----+--------------------+
|  count|5574|                5574|
|   mean|null|               645.0|
| stddev|null|                 NaN|
|    min| ham| &lt;#&gt;  in mc...|
|    max|spam|… we r stayin her...|
+-------+----+--------------------+



In [7]:
data.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [8]:
from pyspark.sql.functions import length

In [9]:
data = data.withColumn('length',length(data['_c1']))

In [10]:
data.show()

+----+--------------------+------+
| _c0|                 _c1|length|
+----+--------------------+------+
| ham|Go until jurong p...|   111|
| ham|Ok lar... Joking ...|    29|
|spam|Free entry in 2 a...|   155|
| ham|U dun say so earl...|    49|
| ham|Nah I don't think...|    61|
|spam|FreeMsg Hey there...|   147|
| ham|Even my brother i...|    77|
| ham|As per your reque...|   160|
|spam|WINNER!! As a val...|   157|
|spam|Had your mobile 1...|   154|
| ham|I'm gonna be home...|   109|
|spam|SIX chances to wi...|   136|
|spam|URGENT! You have ...|   155|
| ham|I've been searchi...|   196|
| ham|I HAVE A DATE ON ...|    35|
|spam|XXXMobileMovieClu...|   149|
| ham|Oh k...i'm watchi...|    26|
| ham|Eh u remember how...|    81|
| ham|Fine if thats th...|    56|
|spam|England v Macedon...|   155|
+----+--------------------+------+
only showing top 20 rows



### Feature transformation

In [11]:
from pyspark.ml.feature import Tokenizer,CountVectorizer,IDF,StringIndexer,StopWordsRemover

In [12]:
tokenizer = Tokenizer(inputCol='_c1',outputCol='tokenized')

In [13]:
s_removed = StopWordsRemover(inputCol='tokenized',outputCol='sremoved')

In [14]:
cv = CountVectorizer(inputCol='sremoved',outputCol='vectorized')

In [15]:
tf_idf = IDF(inputCol='vectorized',outputCol='idf')

In [16]:
# The following converts the "spam/ ham" column to a "label" column
str_ind = StringIndexer(inputCol='_c0',outputCol='label')

In [17]:
# To assemble the features:
from pyspark.ml.feature import VectorAssembler

In [18]:
assembler = VectorAssembler(inputCols=['idf','length'],outputCol='features')

### Creat a pipeline for the above processes

In [19]:
from pyspark.ml import Pipeline

In [20]:
pipeline = Pipeline(stages=[str_ind,tokenizer,s_removed,cv,tf_idf,assembler])

### Creating the model

In [21]:
final_data = pipeline.fit(data)

In [22]:
final_data = final_data.transform(data)

In [23]:
final_data.show()

+----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
| _c0|                 _c1|length|label|           tokenized|            sremoved|          vectorized|                 idf|            features|
+----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
| ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|(13424,[7,11,31,6...|
| ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(13423,[0,24,297,...|(13424,[0,24,297,...|
|spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|(13424,[2,13,19,3...|
| ham|U dun say so earl...|    49|  0.0|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,80,1...|(13423,[0,70,80,1...|

In [24]:
final_data = final_data.select(['label','features'])

In [25]:
final_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [26]:
# Split the final data into train and test
train,test = final_data.randomSplit([0.7,0.3])

In [41]:
from pyspark.ml.classification import NaiveBayes,DecisionTreeClassifier,LinearSVC

In [28]:
nb = NaiveBayes()

In [29]:
%timeit model = nb.fit(train)

3.22 s ± 506 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [31]:
model = nb.fit(train)
result = model.transform(test)

In [32]:
result.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,13,...|[-607.91984003746...|[1.0,1.7094832909...|       0.0|
|  0.0|(13424,[0,1,4,50,...|[-827.73417010843...|[1.0,6.4669560244...|       0.0|
|  0.0|(13424,[0,1,11,32...|[-886.28398662992...|[1.0,1.0309191579...|       0.0|
|  0.0|(13424,[0,1,14,31...|[-216.41327117849...|[1.0,1.0678877119...|       0.0|
|  0.0|(13424,[0,1,21,27...|[-772.03618477198...|[1.0,5.6469713950...|       0.0|
|  0.0|(13424,[0,1,23,63...|[-1301.5735004127...|[1.0,4.4166853327...|       0.0|
|  0.0|(13424,[0,1,30,12...|[-599.46157556861...|[1.0,1.6751203222...|       0.0|
|  0.0|(13424,[0,1,498,5...|[-317.87710040843...|[0.99999999999869...|       0.0|
|  0.0|(13424,[0,1,3657,...|[-128.15470243565...|[0.99997839283287...|       0.0|
|  0.0|(13424,[0

### Accuracy evaluation

In [33]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [34]:
acc = MulticlassClassificationEvaluator()

In [35]:
accuracy = acc.evaluate(result)
print(accuracy)

0.9164404364769805


In [36]:
dt = DecisionTreeClassifier()

In [37]:
%timeit model2 = dt.fit(train)

22.8 s ± 2.71 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [38]:
model2 = dt.fit(train)
result2 = model2.transform(test)

In [40]:
accuracy2 = acc.evaluate(result2)
print(accuracy2)

0.9403201104951989


In [42]:
svc = LinearSVC()

In [43]:
%timeit model3 = svc.fit(train)

4min 36s ± 2.54 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [44]:
model3 = svc.fit(train)
result3 = model3.transform(test)

In [45]:
accuracy3 = acc.evaluate(result3)
print(accuracy3)

0.9728116927261968
