## Problem

Predict spam or ham.

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 61 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=1a4a19b12b900c8710e0c4c2dabab8707e6a2b1b1b7ee15d751ce3609ccb447f
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()
sc = spark.sparkContext
sc

In [7]:
df = spark.read.csv('SMSSpamCollection', inferSchema = True, sep='\t')
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [8]:
df.show(10)

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
+----+--------------------+
only showing top 10 rows



In [9]:
df.describe().show()

+-------+----+--------------------+
|summary| _c0|                 _c1|
+-------+----+--------------------+
|  count|5574|                5574|
|   mean|null|               645.0|
| stddev|null|                null|
|    min| ham| &lt;#&gt;  in mc...|
|    max|spam|… we r stayin her...|
+-------+----+--------------------+



In [10]:
df = df.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [11]:
df.groupBy('class').count().show()

+-----+-----+
|class|count|
+-----+-----+
|  ham| 4827|
| spam|  747|
+-----+-----+



In [12]:
from pyspark.sql.functions import length

In [13]:
df = df.withColumn('length', length(df['text']))

In [14]:
df.show(5)

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
+-----+--------------------+------+
only showing top 5 rows



In [15]:
df.groupby('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [16]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

tokenizer = Tokenizer(inputCol='text', outputCol='token_text')
stopremove = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens', outputCol='c_vec')
idf = IDF(inputCol='c_vec', outputCol='tf_idf')
ham_spam_to_num = StringIndexer(inputCol='class', outputCol='label')

In [17]:
from pyspark.ml.feature import VectorAssembler

clean_up = VectorAssembler(inputCols=['tf_idf', 'length'], outputCol='features')

In [19]:
from pyspark.ml import Pipeline

data_prep_pipe = Pipeline(stages = [ham_spam_to_num, tokenizer, stopremove, count_vec, idf, clean_up])

In [20]:
cleaner = data_prep_pipe.fit(df)

In [22]:
clean_data = cleaner.transform(df)

In [23]:
clean_data.show(5)

+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|class|                text|length|label|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|
+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|  ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|(13424,[7,11,31,6...|
|  ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(13423,[0,24,297,...|(13424,[0,24,297,...|
| spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|(13424,[2,13,19,3...|
|  ham|U dun say so earl...|    49|  0.0|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,80,1...|(13423,[0,70,8

In [28]:
clean_data.head(3)

[Row(class='ham', text='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...', length=111, label=0.0, token_text=['go', 'until', 'jurong', 'point,', 'crazy..', 'available', 'only', 'in', 'bugis', 'n', 'great', 'world', 'la', 'e', 'buffet...', 'cine', 'there', 'got', 'amore', 'wat...'], stop_tokens=['go', 'jurong', 'point,', 'crazy..', 'available', 'bugis', 'n', 'great', 'world', 'la', 'e', 'buffet...', 'cine', 'got', 'amore', 'wat...'], c_vec=SparseVector(13423, {7: 1.0, 11: 1.0, 31: 1.0, 61: 1.0, 72: 1.0, 344: 1.0, 625: 1.0, 731: 1.0, 1409: 1.0, 1598: 1.0, 4485: 1.0, 6440: 1.0, 8092: 1.0, 8838: 1.0, 11344: 1.0, 12979: 1.0}), tf_idf=SparseVector(13423, {7: 3.1126, 11: 3.2055, 31: 3.822, 61: 4.2072, 72: 4.322, 344: 5.4072, 625: 5.918, 731: 6.1411, 1409: 6.6801, 1598: 6.8343, 4485: 7.5274, 6440: 7.9329, 8092: 7.9329, 8838: 7.9329, 11344: 7.9329, 12979: 7.9329}), features=SparseVector(13424, {7: 3.1126, 11: 3.2055, 31: 3.822, 61: 

In [29]:
clean_data = clean_data.select(['label','features'])
clean_data.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
+-----+--------------------+
only showing top 5 rows



In [30]:
train_data, test_data  = clean_data.randomSplit([0.8, 0.2], 20)

print("Records for training: " + str(train_data.count()))
print("Records for evaluation: " + str(test_data.count()))

Records for training: 4492
Records for evaluation: 1082


In [31]:
df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



In [32]:
clean_data.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)



In [18]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes()

In [33]:
model = nb.fit(train_data)

In [34]:
test_results = model.transform(test_data)

In [35]:
test_results.show(10)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,41,...|[-1072.0776881935...|[1.0,7.2108832156...|       0.0|
|  0.0|(13424,[0,1,9,14,...|[-543.83599050000...|[1.0,5.5318075823...|       0.0|
|  0.0|(13424,[0,1,14,78...|[-689.63164356196...|[1.0,3.4213121092...|       0.0|
|  0.0|(13424,[0,1,15,20...|[-669.18490790247...|[1.0,1.4000965626...|       0.0|
|  0.0|(13424,[0,1,27,88...|[-1539.4993336851...|[8.18107202084444...|       1.0|
|  0.0|(13424,[0,1,72,10...|[-676.98193296836...|[1.0,8.5532364756...|       0.0|
|  0.0|(13424,[0,2,3,6,9...|[-3434.3436727419...|[1.0,7.1159145010...|       0.0|
|  0.0|(13424,[0,2,3,6,9...|[-3434.3436727419...|[1.0,7.1159145010...|       0.0|
|  0.0|(13424,[0,2,4,5,7...|[-988.48665838165...|[1.0,2.9910950726...|       0.0|
|  0.0|(13424,[0

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

acc_eval = MulticlassClassificationEvaluator()
print('Accuracy:', acc_eval.evaluate(test_results))

lrmetrics = MulticlassMetrics(test_results['label','prediction'].rdd)
print('Confusion Matrix:\n', lrmetrics.confusionMatrix())
print('F1 Score:', lrmetrics.fMeasure(1.0, 1.0))

Accuracy: 0.9183280766190828
Confusion Matrix:
 DenseMatrix([[832.,   2.],
             [ 95., 153.]])
F1 Score: 0.7593052109181141
