**Remarks**

In this notebook, we will learn about spam-messaging-classification using spark now!. Yeayy! I used this dataset to analyze (check: https://www.kaggle.com/uciml/sms-spam-collection-dataset). Let's started!

In [2]:
# firstly, install pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 65 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 65.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=c022d0cb35961307a736b31c5f1c098524e4ca877ff1396020c8ed8eb9ad0abf
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [29]:
# import library
# 1. pyspark environment
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lit, when, length
from pyspark.sql.types import *

# 2. NLP tools
import re
from nltk.stem.snowball import SnowballStemmer
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, IDF, StringIndexer, VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [4]:
# setting pyspark environment
sc = SparkContext.getOrCreate()
spark = SparkSession.Builder().appName('NLP-spam').getOrCreate()

In [12]:
# preparing dataset
data = spark.read.csv('../content/spam-kaggle.csv', inferSchema=True, sep=';')
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [14]:
data = data.withColumnRenamed('_c0', 'target').withColumnRenamed('_c1', 'text')
data.limit(5).show()

+------+--------------------+
|target|                text|
+------+--------------------+
|   ham|Go until jurong p...|
|   ham|Ok lar... Joking ...|
|  spam|Free entry in 2 a...|
|   ham|U dun say so earl...|
|   ham|Nah I don't think...|
+------+--------------------+



In [17]:
# counting length of the text
data = data.withColumn('length', length(data['text']))
data.limit(5).show()

+------+--------------------+------+
|target|                text|length|
+------+--------------------+------+
|   ham|Go until jurong p...|   111|
|   ham|Ok lar... Joking ...|    29|
|  spam|Free entry in 2 a...|   155|
|   ham|U dun say so earl...|    49|
|   ham|Nah I don't think...|    61|
+------+--------------------+------+



In [20]:
# groupby data
data.groupBy('target').mean().show()

+------+-----------------+
|target|      avg(length)|
+------+-----------------+
|   ham| 66.9396140278066|
|  spam|138.3190348525469|
+------+-----------------+



In [22]:
# cleaning data
tokenizer = Tokenizer(inputCol='text', outputCol='tokens')
stop_remove = StopWordsRemover(inputCol='tokens', outputCol='stop_token')
tf_vec = HashingTF(inputCol='stop_token', outputCol='c_vec')
idf = IDF(inputCol='c_vec', outputCol='tf_idf')
ham_spam_to_numeric = StringIndexer(inputCol='target', outputCol='label')

In [24]:
# vectorization
clean_up = VectorAssembler(inputCols=['tf_idf', 'length'], outputCol='features')
model = NaiveBayes()

In [25]:
# splitting data
train, test = data.randomSplit([0.7,0.3])

In [27]:
# using pipeline
pipeline = Pipeline(stages=[ham_spam_to_numeric, 
                            tokenizer, 
                            stop_remove,
                            tf_vec,
                            idf,
                            clean_up,
                            model
                            ])

In [28]:
# training model
clf = pipeline.fit(train)

In [30]:
# predicting test
predictions = clf.transform(test)

In [31]:
predictions.limit(5).show()

+------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|target|                text|length|label|              tokens|          stop_token|               c_vec|              tf_idf|            features|       rawPrediction|         probability|prediction|
+------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   ham|                 &lt|     4|  0.0|             [, &lt]|             [, &lt]|(262144,[248474,2...|(262144,[248474,2...|(262145,[248474,2...|[-49.182181451882...|[1.0,1.9775394158...|       0.0|
|   ham|                 &lt|     4|  0.0|             [, &lt]|             [, &lt]|(262144,[248474,2...|(262144,[248474,2...|(262145,[248474,2...|[-49.182181451882...|[1.0,1.9775394158...|       

In [32]:
# evaluating model
evaluatorMulti = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction')

predictionAndTarget = predictions.select("label", "prediction")

# showing metrics
acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName:"accuracy"})
f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName:"f1"})
precision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName:"weightedPrecision"})
recall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName:"weightedRecall"})

In [33]:
print('Accuracy:  {:2.2%} '.format(acc))
print('Precision: {:2.2%} '.format(precision))
print('Recall:    {:2.2%} '.format(recall))
print('F1 Score:  {:2.2%} '.format(f1))

Accuracy:  96.56% 
Precision: 96.59% 
Recall:    96.56% 
F1 Score:  96.37% 
