# Spam Detection

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spam").getOrCreate()

In [2]:
data = spark.read.csv("../../Data/smsspamcollection/SMSSpamCollection",inferSchema=True, sep = "\t")

In [4]:
data.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [7]:
# rename the feature
data = data.withColumnRenamed("_c0","class").withColumnRenamed("_c1","text")
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



# Processing Tools 

In [10]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import length
# col allow us to call a columns and then use the fonction using lamba expression
from pyspark.sql.types import IntegerType

In [12]:
data = data.withColumn("length",length(data["text"]))
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



# see the lentgh of the spam text and ham text to see if we can have the first intuition

In [14]:
data.groupBy("class").mean().show()
# we can say that the lenght one of the most feature to resolve this problem of spam 

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [34]:
from pyspark.ml.feature import (Tokenizer, StopWordsRemover,CountVectorizer,
            IDF, StringIndexer)

In [35]:
# transfrom the sentence into a list of word
tokenizer = Tokenizer(inputCol="text",outputCol="token_text")

# remove stop words  commun word
stop_remove = StopWordsRemover(inputCol="token_text",outputCol = "stop_token")

# count Vetorisation number  fequensy of the word 
count_vec = CountVectorizer(inputCol="stop_token",outputCol="c_vec")

In [36]:
idf = IDF(inputCol="c_vec",outputCol="tf_idf")

In [37]:
# convert class to numeric target .
ham_spam_to_numeric = StringIndexer(inputCol="class",outputCol="label")

In [38]:
# Re structure our dataset to fit the models that we are gonna use 
from pyspark.ml.feature import VectorAssembler

In [39]:
clean_up = VectorAssembler(inputCols=["tf_idf","length"],outputCol="features")

In [40]:
from pyspark.ml.classification import NaiveBayes

In [55]:
nb = NaiveBayes()

# as i have many steps lets use a pipeline 

In [42]:
from pyspark.ml import Pipeline

In [43]:
data_pipeline = Pipeline(stages = [ham_spam_to_numeric,tokenizer,stop_remove,count_vec,idf,clean_up])

In [45]:
cleaner = data_pipeline.fit(data)
cleaner_data = cleaner.transform(data)

In [46]:
cleaner_data.columns

['class',
 'text',
 'length',
 'label',
 'token_text',
 'stop_token',
 'c_vec',
 'tf_idf',
 'features']

In [49]:
cleaner_data=cleaner_data.select(["label","features"])

In [50]:
cleaner_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



# Spliting to train and test set 

In [53]:
train_data, test_data = cleaner_data.randomSplit([0.7,0.3])

In [54]:
train_data.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)



In [59]:
# train the model
spam_detector = nb.fit(train_data)

# test the model
test_result = spam_detector.transform(test_data)

In [60]:
test_result.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,5,15,...|[-1000.9974085564...|[1.0,2.8520181930...|       0.0|
|  0.0|(13424,[0,1,14,18...|[-1360.4824335131...|[1.0,5.5420012608...|       0.0|
|  0.0|(13424,[0,1,21,27...|[-1011.8381629408...|[1.0,2.7813647153...|       0.0|
|  0.0|(13424,[0,1,43,69...|[-616.39813593536...|[0.99907232361419...|       0.0|
|  0.0|(13424,[0,1,498,5...|[-321.15529320407...|[0.99999999998953...|       0.0|
|  0.0|(13424,[0,2,3,6,9...|[-3290.0237832798...|[1.0,3.2218078038...|       0.0|
|  0.0|(13424,[0,2,4,5,1...|[-2491.8978172265...|[1.0,2.0367718874...|       0.0|
|  0.0|(13424,[0,2,4,8,2...|[-1404.0277841778...|[1.0,8.2215786454...|       0.0|
|  0.0|(13424,[0,2,4,44,...|[-1902.9550183672...|[1.0,9.3262435539...|       0.0|
|  0.0|(13424,[0

# Evaluation

In [61]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [62]:
acc_eva = MulticlassClassificationEvaluator()

In [64]:
acc = acc_eva.evaluate(test_result)

In [67]:
print("ACC for naive Bayes ")
acc

ACC for naive Bayes 


0.9114057314743567