In [1]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.getOrCreate()

val data = (spark.read.option("header", "false") // no header in the data
            .option("inferSchema", "True")
            .option("sep", "\t")
            .csv("SMSSpamCollection"))

// since the data does not have header, let's define column name
val new_data = data.withColumnRenamed("_c0", "class").withColumnRenamed("_c1", "text")
new_data.show()

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.0.6:4042
SparkContext available as 'sc' (version = 2.2.0, master = local[*], app id = local-1501628004848)
SparkSession available as 'spark'


+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if that?s th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@3ccb909b
data: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]
new_data: org.apache.spark.sql.DataFrame = [class: string, text: string]


### Data Preprocessing

In [2]:
import org.apache.spark.sql.functions.length
var len_data = new_data.withColumn("length", length($"text"))
len_data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if that?s th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



import org.apache.spark.sql.functions.length
len_data: org.apache.spark.sql.DataFrame = [class: string, text: string ... 1 more field]


In [3]:
len_data.groupBy("class").mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



*** This shows that the length of the spam test is almost double the ham test. So the length could be a good feature to consider***

In [4]:
import org.apache.spark.sql.functions.{regexp_replace, lower}
val clean_data = len_data.select($"class", lower(regexp_replace(col("text"), "[^a-zA-Z]", " ")).as("Clean_text"), $"length")
clean_data.show()

+-----+--------------------+------+
|class|          Clean_text|length|
+-----+--------------------+------+
|  ham|go until jurong p...|   111|
|  ham|ok lar    joking ...|    29|
| spam|free entry in   a...|   155|
|  ham|u dun say so earl...|    49|
|  ham|nah i don t think...|    61|
| spam|freemsg hey there...|   147|
|  ham|even my brother i...|    77|
|  ham|as per your reque...|   160|
| spam|winner   as a val...|   157|
| spam|had your mobile  ...|   154|
|  ham|i m gonna be home...|   109|
| spam|six chances to wi...|   136|
| spam|urgent  you have ...|   155|
|  ham|i ve been searchi...|   196|
|  ham|i have a date on ...|    35|
| spam|xxxmobilemovieclu...|   149|
|  ham|oh k   i m watchi...|    26|
|  ham|eh u remember how...|    81|
|  ham|fine if that s th...|    56|
| spam|england v macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



import org.apache.spark.sql.functions.{regexp_replace, lower}
clean_data: org.apache.spark.sql.DataFrame = [class: string, Clean_text: string ... 1 more field]


### Feature Transformation

In [5]:
import org.apache.spark.ml.feature.{Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer}
val tokenizer = new Tokenizer().setInputCol("Clean_text").setOutputCol("token_text")
val stop_remove = new StopWordsRemover().setInputCol("token_text").setOutputCol("stop_token")
val count_vec = new CountVectorizer().setInputCol("stop_token").setOutputCol ("c_vec")
val idf = new IDF().setInputCol("c_vec").setOutputCol("tf_idf")
val class_label = new StringIndexer().setInputCol("class").setOutputCol("label")

import org.apache.spark.ml.feature.{Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer}
tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_017d08f93c25
stop_remove: org.apache.spark.ml.feature.StopWordsRemover = stopWords_74c79c0c369b
count_vec: org.apache.spark.ml.feature.CountVectorizer = cntVec_5b8e5281c542
idf: org.apache.spark.ml.feature.IDF = idf_30925198af5d
class_label: org.apache.spark.ml.feature.StringIndexer = strIdx_1925cf98ec24


In [6]:
import org.apache.spark.ml.feature.VectorAssembler

val assembler = (new VectorAssembler()
                .setInputCols(Array("tf_idf", "length"))
                .setOutputCol("features"))

import org.apache.spark.ml.feature.VectorAssembler
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_339528a5ac9e


In [7]:
// Let's create a pipeline with all the objects defined above
import org.apache.spark.ml.Pipeline

val pipeline = new Pipeline().setStages(Array(class_label, tokenizer, stop_remove, count_vec, idf, assembler))
val final_data = pipeline.fit(clean_data).transform(clean_data)
final_data.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(7662,[0,9,16,25,...|
|  0.0|(7662,[0,1,8,248,...|
|  1.0|(7662,[0,10,22,24...|
|  0.0|(7662,[0,1,52,80,...|
|  0.0|(7662,[0,51,136,3...|
|  1.0|(7662,[0,8,14,20,...|
|  0.0|(7662,[0,14,127,2...|
|  0.0|(7662,[0,145,158,...|
|  1.0|(7662,[0,2,64,81,...|
|  1.0|(7662,[0,1,2,10,2...|
|  0.0|(7662,[0,3,21,30,...|
|  1.0|(7662,[0,15,20,23...|
|  1.0|(7662,[0,10,24,52...|
|  0.0|(7662,[0,44,75,83...|
|  0.0|(7662,[477,657,76...|
|  1.0|(7662,[0,24,25,78...|
|  0.0|(7662,[0,3,35,61,...|
|  0.0|(7662,[0,1,70,72,...|
|  0.0|(7662,[0,1,66,71,...|
|  1.0|(7662,[0,5,24,41,...|
+-----+--------------------+
only showing top 20 rows



import org.apache.spark.ml.Pipeline
pipeline: org.apache.spark.ml.Pipeline = pipeline_d6cd1f5dc1dc
final_data: org.apache.spark.sql.DataFrame = [class: string, Clean_text: string ... 7 more fields]


### Training and Evaluation

In [8]:
// Naive bayes model is very common to use with NLP
import org.apache.spark.ml.classification.NaiveBayes

// Splitting the data into train set and test set
val Array(train_data, test_data) = final_data.select("label", "features").randomSplit(Array(0.8, 0.2))

// Fitting the classifier model
val nb = new NaiveBayes()
val nbModel = nb.fit(train_data)

// Making prediction using text data and trained model
val result = nbModel.transform(test_data)
result.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(7662,[0,1,2,8,32...|[-147.25744679125...|[0.99999999965130...|       0.0|
|  0.0|(7662,[0,1,2,8,74...|[-166.15149758450...|[0.14452257918736...|       1.0|
|  0.0|(7662,[0,1,2,10,2...|[-487.73019302991...|[1.0,2.3175437907...|       0.0|
|  0.0|(7662,[0,1,2,13,1...|[-588.88957009481...|[1.0,6.4204232555...|       0.0|
|  0.0|(7662,[0,1,2,24,3...|[-416.47617533368...|[0.99999999999980...|       0.0|
|  0.0|(7662,[0,1,3,4,5,...|[-2614.0422941739...|[1.0,1.4961496857...|       0.0|
|  0.0|(7662,[0,1,3,8,9,...|[-840.51123980113...|[1.0,3.7805434265...|       0.0|
|  0.0|(7662,[0,1,3,9,12...|[-1005.1979496043...|[1.0,3.5991093813...|       0.0|
|  0.0|(7662,[0,1,3,12,5...|[-577.74435276786...|[1.0,2.1593665931...|       0.0|
|  0.0|(7662,[0,

import org.apache.spark.ml.classification.NaiveBayes
train_data: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: double, features: vector]
test_data: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: double, features: vector]
nb: org.apache.spark.ml.classification.NaiveBayes = nb_e177e974ed95
nbModel: org.apache.spark.ml.classification.NaiveBayesModel = NaiveBayesModel (uid=nb_e177e974ed95) with 2 classes
result: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 3 more fields]


In [9]:
// let's evaluate othe performance of our model
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val eval = new MulticlassClassificationEvaluator()
val accuracy = eval.evaluate(result)
println($"Accuracy of model at predicting spam was: $accuracy")

Accuracy of model at predicting spam was: 0.9496410835752409


import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
eval: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_c5dba066543f
accuracy: Double = 0.9496410835752409
