In [1]:
import findspark

In [2]:
findspark.init('/home/ubuntu/spark-3.0.1-bin-hadoop2.7')

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('spam').getOrCreate()

In [6]:
# We import the dataset 

df = spark.read.csv('SMSSpamCollection',inferSchema=True,
                   sep='\t')

In [7]:
# This is an optional step, we just rename our columns so that it's easier for us to work

df = df.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')


In [8]:
# We suspect that there is significant difference in the length between
# the spam and ham emails. Thus we do some feature engineering and we 
# compute the length of each email. 

from pyspark.sql.functions import length, mean

In [9]:
df = df.withColumn('length',length(df['text']))

In [10]:
df.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [11]:
# We see that there is significant difference between the length 
# of the spam and ham emails. 

df.groupBy('class').agg(mean('length').alias('avg_length')).show()

+-----+-----------------+
|class|       avg_length|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



We will build a pipeline in order to prepare our data for training our model. We start by importing Natural Language Proccesing methods. The Tokenizer function is applied on the 'text' column and gives back a list consisting of the words of the corresponding email. Then the StopWordsRemover function is used to identify -and remove from the tokens list- words that appear frequenctly and they don't carry so much meaning. The CountVectorizer function is used to convert a collection of text documents, to vectors of token counts. The Inverse Document Frequency (IDF) function is applied on vectors and is a measure on how much information a term provides.

Finally, we use the StringIndexer function in order to convert the categorical label 'ham' and 'spam', to numerical label. The VectorAssembler is a function converting the features into a single vector of features. 

In [12]:
from pyspark.ml.feature import (Tokenizer, StopWordsRemover, CountVectorizer,
                               IDF,StringIndexer,VectorAssembler)
from pyspark.ml.pipeline import Pipeline

In [13]:
tokenizer = Tokenizer(inputCol='text',outputCol='words')
stop_remove = StopWordsRemover(inputCol='words',outputCol='final_words')
count_vec = CountVectorizer(inputCol='final_words',outputCol='c_vec')
idf = IDF(inputCol='c_vec',outputCol='tf_idf')
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [14]:
data_prep_pipe = Pipeline(stages = [ham_spam_to_num,tokenizer,stop_remove,
                                    count_vec,idf,clean_up])

Now it's time to do some Machine Learning! We pick the Naive Bayes classification model, but in fact this choice is not unique. It's worth trying different models and compare them, based on some classification metrics such as accuracy.

In [15]:
from pyspark.ml.classification import NaiveBayes

In [16]:
nb = NaiveBayes(featuresCol='features',labelCol='label')

In [17]:
cleaner = data_prep_pipe.fit(df)

In [18]:
clean_data = cleaner.transform(df)

In [19]:
final_data = clean_data.select('label','features')

In [20]:
final_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [21]:
train, test = final_data.randomSplit([0.7,0.3])

In [22]:
spam_detector = nb.fit(train)

In [23]:
test_results = spam_detector.transform(test)

In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator

In [25]:
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,13,...|[-627.60992036585...|[0.99999999999552...|       0.0|
|  0.0|(13424,[0,1,3,9,1...|[-572.27143914660...|[1.0,1.5826959828...|       0.0|
|  0.0|(13424,[0,1,5,20,...|[-803.84053843071...|[1.0,1.7834899535...|       0.0|
|  0.0|(13424,[0,1,9,14,...|[-542.29078511032...|[1.0,1.7207497290...|       0.0|
|  0.0|(13424,[0,1,18,20...|[-837.53409585006...|[1.0,4.5043123696...|       0.0|
|  0.0|(13424,[0,1,24,31...|[-338.72017694265...|[1.0,3.2648930659...|       0.0|
|  0.0|(13424,[0,1,27,88...|[-1535.5633153710...|[0.00147975093054...|       1.0|
|  0.0|(13424,[0,1,31,43...|[-339.07700057359...|[1.0,9.0850525172...|       0.0|
|  0.0|(13424,[0,1,43,69...|[-613.82115299856...|[0.99995872400822...|       0.0|
|  0.0|(13424,[0

In [26]:
# We create an instance of an evaluator, with the metric name being the 'accuracy'

acc_eval = MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='label',
                                            metricName='accuracy')

In [27]:
nb_acc = acc_eval.evaluate(test_results)

In [28]:
print(f'The accuracy of the Naive Bayes model is: {nb_acc}')

The accuracy of the Naive Bayes model is: 0.9177842565597668


In [29]:
from pyspark.ml.classification import LogisticRegression

In [30]:
# We build a logistic regression model

lg = LogisticRegression(featuresCol='features',labelCol='label')

In [31]:
lg_model = lg.fit(train)

In [32]:
lg_test_results = lg_model.transform(test)

In [33]:
lg_acc = acc_eval.evaluate(lg_test_results)

In [34]:
print(f'The accuracy of the Logistic Regression model is: {lg_acc}')

The accuracy of the Logistic Regression model is: 0.970262390670554
