In [64]:
import findspark

In [65]:
findspark.init()

In [66]:
from pyspark.sql import SparkSession

In [67]:
spark = SparkSession.builder.appName("SpamDetection").getOrCreate()

In [68]:
data = spark.read.csv("smsspamcollection/SMSSpamCollection",inferSchema=True,sep = "\t")

In [69]:
data = data.withColumnRenamed("_c0","class").withColumnRenamed("_c1","text")

In [70]:
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [71]:
from pyspark.sql.functions import length

In [72]:
data = data.withColumn("length",length(data["text"]))

In [73]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [74]:
data.groupBy("class").mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [98]:
from pyspark.ml.feature import Tokenizer,HashingTF,IDF,StopWordsRemover,CountVectorizer,StringIndexer

In [99]:
tokenizer = Tokenizer(inputCol = "text", outputCol= "words")

In [100]:
##words_data = tokenizer.transform(data)

In [101]:
##words_data.show()

+-----+--------------------+------+--------------------+
|class|                text|length|               words|
+-----+--------------------+------+--------------------+
|  ham|Go until jurong p...|   111|[go, until, juron...|
|  ham|Ok lar... Joking ...|    29|[ok, lar..., joki...|
| spam|Free entry in 2 a...|   155|[free, entry, in,...|
|  ham|U dun say so earl...|    49|[u, dun, say, so,...|
|  ham|Nah I don't think...|    61|[nah, ı, don't, t...|
| spam|FreeMsg Hey there...|   147|[freemsg, hey, th...|
|  ham|Even my brother i...|    77|[even, my, brothe...|
|  ham|As per your reque...|   160|[as, per, your, r...|
| spam|WINNER!! As a val...|   157|[wınner!!, as, a,...|
| spam|Had your mobile 1...|   154|[had, your, mobil...|
|  ham|I'm gonna be home...|   109|[ı'm, gonna, be, ...|
| spam|SIX chances to wi...|   136|[sıx, chances, to...|
| spam|URGENT! You have ...|   155|[urgent!, you, ha...|
|  ham|I've been searchi...|   196|[ı've, been, sear...|
|  ham|I HAVE A DATE ON ...|   

In [102]:
remover = StopWordsRemover(inputCol="words", outputCol = "filtered" )

In [141]:
##remover_data = remover.transform(words_data)

In [142]:
##remover_data.show()

+-----+--------------------+------+--------------------+--------------------+
|class|                text|length|               words|            filtered|
+-----+--------------------+------+--------------------+--------------------+
|  ham|Go until jurong p...|   111|[go, until, juron...|[go, jurong, poin...|
|  ham|Ok lar... Joking ...|    29|[ok, lar..., joki...|[ok, lar..., joki...|
| spam|Free entry in 2 a...|   155|[free, entry, in,...|[free, entry, 2, ...|
|  ham|U dun say so earl...|    49|[u, dun, say, so,...|[u, dun, say, ear...|
|  ham|Nah I don't think...|    61|[nah, ı, don't, t...|[nah, ı, think, g...|
| spam|FreeMsg Hey there...|   147|[freemsg, hey, th...|[freemsg, hey, da...|
|  ham|Even my brother i...|    77|[even, my, brothe...|[even, brother, l...|
|  ham|As per your reque...|   160|[as, per, your, r...|[per, request, 'm...|
| spam|WINNER!! As a val...|   157|[wınner!!, as, a,...|[wınner!!, valued...|
| spam|Had your mobile 1...|   154|[had, your, mobil...|[mobile,

In [103]:
count_vec = CountVectorizer(inputCol = "filtered", outputCol = "c_vec")

In [143]:
##count_vec_data = count_vec.fit(remover_data).transform(remover_data)

In [144]:
##count_vec_data.show()

+-----+--------------------+------+--------------------+--------------------+--------------------+
|class|                text|length|               words|            filtered|               c_vec|
+-----+--------------------+------+--------------------+--------------------+--------------------+
|  ham|Go until jurong p...|   111|[go, until, juron...|[go, jurong, poin...|(13666,[9,12,33,6...|
|  ham|Ok lar... Joking ...|    29|[ok, lar..., joki...|[ok, lar..., joki...|(13666,[1,25,308,...|
| spam|Free entry in 2 a...|   155|[free, entry, in,...|[free, entry, 2, ...|(13666,[3,15,21,3...|
|  ham|U dun say so earl...|    49|[u, dun, say, so,...|[u, dun, say, ear...|(13666,[1,74,83,1...|
|  ham|Nah I don't think...|    61|[nah, ı, don't, t...|[nah, ı, think, g...|(13666,[0,41,139,...|
| spam|FreeMsg Hey there...|   147|[freemsg, hey, th...|[freemsg, hey, da...|(13666,[13,64,145...|
|  ham|Even my brother i...|    77|[even, my, brothe...|[even, brother, l...|(13666,[13,55,106...|
|  ham|As 

In [104]:
idf = IDF(inputCol = "c_vec", outputCol = "tf_idf")

In [145]:
##idf_data = idf.fit(count_vec_data).transform(count_vec_data)

In [149]:
##idf_data.select("c_vec").show(1,truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------+
|c_vec                                                                                                                                          |
+-----------------------------------------------------------------------------------------------------------------------------------------------+
|(13666,[9,12,33,65,75,344,635,711,1421,1706,4142,6875,7324,8596,11309,12788],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
+-----------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



In [150]:
##idf_data.select("tf_idf").show(1,truncate=False )

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tf_idf                                                                                                                                                                                                                                                                                                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [105]:
ham_spam_to_numeric = StringIndexer(inputCol = "class", outputCol = "label")

In [106]:
from pyspark.ml.feature import VectorAssembler

In [107]:
assembler = VectorAssembler(inputCols =["tf_idf","length"],outputCol="features")

In [108]:
from pyspark.ml.classification import NaiveBayes

In [109]:
nb = NaiveBayes()# using classification alogrithms generally common in NLP

In [110]:
from pyspark.ml import Pipeline

In [111]:
data_prep_pipe = Pipeline(stages = [ham_spam_to_numeric,tokenizer,remover,count_vec,idf,assembler])

In [112]:
cleaner = data_prep_pipe.fit(data)

In [113]:
clean_data = cleaner.transform(data)

In [115]:
clean_data= clean_data.select("label","features")

In [116]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13667,[9,12,33,6...|
|  0.0|(13667,[1,25,308,...|
|  1.0|(13667,[3,15,21,3...|
|  0.0|(13667,[1,74,83,1...|
|  0.0|(13667,[0,41,139,...|
|  1.0|(13667,[13,64,145...|
|  0.0|(13667,[13,55,106...|
|  0.0|(13667,[132,199,4...|
|  1.0|(13667,[2,51,126,...|
|  1.0|(13667,[1,2,15,29...|
|  0.0|(13667,[7,20,47,1...|
|  1.0|(13667,[10,18,40,...|
|  1.0|(13667,[15,32,90,...|
|  0.0|(13667,[0,43,102,...|
|  0.0|(13667,[0,560,615...|
|  1.0|(13667,[32,113,11...|
|  0.0|(13667,[85,221,45...|
|  0.0|(13667,[1,3,52,14...|
|  0.0|(13667,[1,79,109,...|
|  1.0|(13667,[5,32,36,6...|
+-----+--------------------+
only showing top 20 rows



In [117]:
training,test = clean_data.randomSplit([0.7,0.3])

In [118]:
spam_detector = nb.fit(training)

In [119]:
data.printSchema

<bound method DataFrame.printSchema of DataFrame[class: string, text: string, length: int]>

In [120]:
test_results = spam_detector.transform(test)

In [121]:
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13667,[0,1,2,6,1...|[-1354.2666929240...|[1.0,6.3080421534...|       0.0|
|  0.0|(13667,[0,1,2,7,1...|[-720.31897381467...|[1.0,8.4303877230...|       0.0|
|  0.0|(13667,[0,1,2,9,1...|[-1203.5917480382...|[1.0,1.7536421866...|       0.0|
|  0.0|(13667,[0,1,3,4,8...|[-3492.9475783291...|[1.0,7.5404685464...|       0.0|
|  0.0|(13667,[0,1,3,4,8...|[-3492.9475783291...|[1.0,7.5404685464...|       0.0|
|  0.0|(13667,[0,1,3,5,1...|[-1141.6428592031...|[1.0,6.5981860538...|       0.0|
|  0.0|(13667,[0,1,3,9,1...|[-1372.0835578912...|[1.0,7.7415951642...|       0.0|
|  0.0|(13667,[0,1,3,12,...|[-1823.6264820124...|[0.99905734282346...|       0.0|
|  0.0|(13667,[0,1,3,22,...|[-402.08371876671...|[1.0,7.8582830810...|       0.0|
|  0.0|(13667,[0

In [122]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [135]:
naiveb_acc_eval = MulticlassClassificationEvaluator()

In [136]:
naiveb_acc_eval.evaluate(test_results)

0.9339149329378885

In [125]:
from pyspark.ml.classification import RandomForestClassifier

In [126]:
rfc = RandomForestClassifier(labelCol = "label", featuresCol = "features")

In [127]:
rfc_model = rfc.fit(training)

In [128]:
rfc_preds = rfc_model.transform(test)

In [130]:
rfc_preds.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13667,[0,1,2,6,1...|[16.9661051391126...|[0.84830525695563...|       0.0|
|  0.0|(13667,[0,1,2,7,1...|[17.8706062224444...|[0.89353031112222...|       0.0|
|  0.0|(13667,[0,1,2,9,1...|[16.7930367684457...|[0.83965183842228...|       0.0|
|  0.0|(13667,[0,1,3,4,8...|[17.2855348614912...|[0.86427674307456...|       0.0|
|  0.0|(13667,[0,1,3,4,8...|[17.2855348614912...|[0.86427674307456...|       0.0|
|  0.0|(13667,[0,1,3,5,1...|[17.4594352632629...|[0.87297176316314...|       0.0|
|  0.0|(13667,[0,1,3,9,1...|[17.2855348614912...|[0.86427674307456...|       0.0|
|  0.0|(13667,[0,1,3,12,...|[16.2078995190751...|[0.81039497595375...|       0.0|
|  0.0|(13667,[0,1,3,22,...|[17.6802857433726...|[0.88401428716863...|       0.0|
|  0.0|(13667,[0

In [131]:
rfc_acc = MulticlassClassificationEvaluator()

In [132]:
rfc_acc.evaluate(rfc_preds)

0.8284927416540367