In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.4.5-bin-hadoop2.7')
from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler, StringIndexer, Tokenizer, RegexTokenizer, StopWordsRemover, NGram
from pyspark.ml.feature import HashingTF, IDF, CountVectorizer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import length
from pyspark.sql.types import IntegerType
import os
os.chdir('/home/ubuntu/data')

In [2]:
spark = SparkSession.builder.appName('spam_detection').getOrCreate()

In [3]:
df = spark.read.csv('SMSSpamCollection', inferSchema=True, sep='\t') 
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [4]:
df.head(3)

[Row(_c0='ham', _c1='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...'),
 Row(_c0='ham', _c1='Ok lar... Joking wif u oni...'),
 Row(_c0='spam', _c1="Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's")]

In [5]:
df = df.withColumnRenamed('_c0', 'class').withColumnRenamed('_c1', 'text')
df.head(3)

[Row(class='ham', text='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...'),
 Row(class='ham', text='Ok lar... Joking wif u oni...'),
 Row(class='spam', text="Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's")]

In [6]:
df.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [7]:
df = df.withColumn('length', length(df['text']))
df.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [8]:
df.groupBy('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [9]:
tokken = Tokenizer(inputCol='text', outputCol='token_text')
remover = StopWordsRemover(inputCol="token_text", outputCol="stop_token")
cv = CountVectorizer(inputCol='stop_token', outputCol='c_vec', vocabSize=3, minDF=2.0)
idf = IDF(inputCol='c_vec', outputCol='tf_idf')
ham_spam_to_num = StringIndexer(inputCol='class', outputCol='label')

In [10]:
assembler = VectorAssembler(inputCols=['length', 'tf_idf'], outputCol='features')
nb =  NaiveBayes()

In [11]:
data_pipeline = Pipeline(stages=[ham_spam_to_num, tokken, remover, cv, idf, assembler])
clean_df = data_pipeline.fit(df).transform(df)
clean_df.show()

+-----+--------------------+------+-----+--------------------+--------------------+-------------------+--------------------+--------------------+
|class|                text|length|label|          token_text|          stop_token|              c_vec|              tf_idf|            features|
+-----+--------------------+------+-----+--------------------+--------------------+-------------------+--------------------+--------------------+
|  ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|          (3,[],[])|           (3,[],[])|     (4,[0],[111.0])|
|  ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|      (3,[0],[1.0])|(3,[0],[2.0166983...|[29.0,2.016698353...|
| spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|      (3,[2],[1.0])|(3,[2],[2.7044691...|[155.0,0.0,0.0,2....|
|  ham|U dun say so earl...|    49|  0.0|[u, dun, say, so,...|[u, dun, say, ear...|      (3,[0],[2.0])|(3,[0],[4.0333967...|

In [12]:
clean_df.columns

['class',
 'text',
 'length',
 'label',
 'token_text',
 'stop_token',
 'c_vec',
 'tf_idf',
 'features']

In [13]:
clean_df = clean_df.select(['label', 'features'])
clean_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|     (4,[0],[111.0])|
|  0.0|[29.0,2.016698353...|
|  1.0|[155.0,0.0,0.0,2....|
|  0.0|[49.0,4.033396706...|
|  0.0|      (4,[0],[61.0])|
|  1.0|     (4,[0],[147.0])|
|  0.0|      (4,[0],[77.0])|
|  0.0|     (4,[0],[160.0])|
|  1.0|[157.0,0.0,2.3645...|
|  1.0|[154.0,2.01669835...|
|  0.0|     (4,[0],[109.0])|
|  1.0|     (4,[0],[136.0])|
|  1.0|     (4,[0],[155.0])|
|  0.0|     (4,[0],[196.0])|
|  0.0|      (4,[0],[35.0])|
|  1.0|     (4,[0],[149.0])|
|  0.0|      (4,[0],[26.0])|
|  0.0|[81.0,2.016698353...|
|  0.0|[56.0,2.016698353...|
|  1.0|     (4,[0],[155.0])|
+-----+--------------------+
only showing top 20 rows



In [14]:
train, test = clean_df.randomSplit([0.7, 0.3])

In [15]:
spam_detector = nb.fit(train)
test_results = spam_detector.transform(test)
test_results.show()

+-----+--------------+--------------------+--------------------+----------+
|label|      features|       rawPrediction|         probability|prediction|
+-----+--------------+--------------------+--------------------+----------+
|  0.0| (4,[0],[2.0])|[-0.1633426714819...|[0.86573581892816...|       0.0|
|  0.0| (4,[0],[2.0])|[-0.1633426714819...|[0.86573581892816...|       0.0|
|  0.0| (4,[0],[2.0])|[-0.1633426714819...|[0.86573581892816...|       0.0|
|  0.0| (4,[0],[2.0])|[-0.1633426714819...|[0.86573581892816...|       0.0|
|  0.0| (4,[0],[3.0])|[-0.1722403132750...|[0.86632596636606...|       0.0|
|  0.0| (4,[0],[5.0])|[-0.1900355968613...|[0.86749968033705...|       0.0|
|  0.0| (4,[0],[5.0])|[-0.1900355968613...|[0.86749968033705...|       0.0|
|  0.0| (4,[0],[5.0])|[-0.1900355968613...|[0.86749968033705...|       0.0|
|  0.0| (4,[0],[6.0])|[-0.1989332386544...|[0.86808325621570...|       0.0|
|  0.0| (4,[0],[7.0])|[-0.2078308804476...|[0.86866465095614...|       0.0|
|  0.0| (4,[

In [16]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print('Accuracy of NB model : ', acc)

Accuracy of NB model :  0.8752057071803889
