<a href="https://colab.research.google.com/github/Ricardo-Jaramillo/PySpark/blob/main/14_NLP_SpamDetection_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spam detection with NLP in PySpark
Let's use what we learned in the previous step and work with NLP techniques to build a model which can predict if a message is a Spam or not.

## Install pyspark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=7d3950fe6c34e973efbc8b338af87f0be380b617d2803e815a88c71da90a8528
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
# Donwload the file
!wget https://raw.githubusercontent.com/Ricardo-Jaramillo/PySpark/main/datasets/NLP/SMSSpamCollection

--2023-10-04 23:55:05--  https://raw.githubusercontent.com/Ricardo-Jaramillo/PySpark/main/datasets/NLP/SMSSpamCollection
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 477907 (467K) [text/plain]
Saving to: ‘SMSSpamCollection’


2023-10-04 23:55:05 (2.92 MB/s) - ‘SMSSpamCollection’ saved [477907/477907]



In [63]:
# Import necesary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import length
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

## Create a Spark Session and read the data

In [4]:
# Create a session
spark = SparkSession.builder.appName('nlp_example').getOrCreate()

In [14]:
# Read in the data file
raw_data = spark.read.csv('SMSSpamCollection', inferSchema=True, sep='\t')

In [19]:
# Show the data
data = raw_data.withColumnRenamed('_c0', 'class').withColumnRenamed('_c1', 'text')

In [20]:
# Show data
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



### Apply Feature Engineering
Let's try to see if adding the length of the message as a feature results in a better performance of the model

In [21]:
# Add column with the length of every mesasge
data = data.withColumn('length', length(data['text']))

In [22]:
# Show the new dataFrame
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [23]:
data.groupBy('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



> We can guess that the length of the msessage has a strong influence on the result of a Spam message. So we'll add it,

## Preprocess the Data.

We'll create the following objects:
* StringIndexer
* Vector Assembler
* Tokenizer
* StopWordsRemover
* CountVectorizer
* IDF (Inverse Document Frequency)
* Vector Assembler

### Object creation

In [30]:
# StringIndexer
ham_spam_to_numeric = StringIndexer(inputCol='class', outputCol='label')

In [31]:
# Create tokenizer object
tokenizer = Tokenizer(inputCol='text', outputCol='token_text')

In [32]:
# Create words remover object
stop_remover = StopWordsRemover(inputCol='token_text', outputCol='stop_token')

In [33]:
# Create countVectorizer object
count_vec = CountVectorizer(inputCol='stop_token', outputCol='count_vec')

In [34]:
# Create IDF object
idf = IDF(inputCol='count_vec', outputCol='tf_idf')

In [38]:
# VectorAssembler
clean_up = VectorAssembler(inputCols=['tf_idf', 'length'], outputCol='features')

### Create a pipeline to preprocess in a single step
We're gonna create our NLP model using the `Pipeline` function.

This let's us to concatenate all process into one simple step

In [45]:
# Create the pipeline
data_prep_pipeline = Pipeline(stages=[
    ham_spam_to_numeric,
    tokenizer,
    stop_remover,
    count_vec,
    idf,
    clean_up
])

### Preprocess
Get a clean_data DataFrame

In [46]:
# Pass our data to the pipeline we just created
cleaner = data_prep_pipeline.fit(data)

In [49]:
# Make predictions on the data
clean_data = cleaner.transform(data)

In [53]:
# Show clean_data
clean_data.show()

+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|class|                text|length|label|          token_text|          stop_token|           count_vec|              tf_idf|            features|
+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|  ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|(13424,[7,11,31,6...|
|  ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,301,...|(13423,[0,24,301,...|(13424,[0,24,301,...|
| spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|(13424,[2,13,19,3...|
|  ham|U dun say so earl...|    49|  0.0|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,80,1...|(13423,[0,70,8

In [50]:
# Show columns
clean_data.columns

Exception ignored in: <function JavaWrapper.__del__ at 0x7f61ffcb6200>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


['class',
 'text',
 'length',
 'label',
 'token_text',
 'stop_token',
 'count_vec',
 'tf_idf',
 'features']

In [67]:
# Get a final_data DataFrame
final_data = clean_data.select('label', 'features')

In [68]:
# Show data
final_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,301,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,140...|
|  0.0|(13424,[10,53,102...|
|  0.0|(13424,[127,185,4...|
|  1.0|(13424,[1,47,121,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,117...|
|  1.0|(13424,[8,16,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,95,221...|
|  0.0|(13424,[555,1797,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,44...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



## Train and evaluate the model

In [58]:
# Split into train and test
train_data, test_data = clean_data.randomSplit([0.7, 0.3])

In [69]:
# Declare the ML method we'll use to solve the problem. In this case, the Naive Bayes
nb = NaiveBayes()

In [70]:
# Fit the model with our train_data
spam_detector = nb.fit(train_data)

In [71]:
# Make predictions with our test_data
test_results = spam_detector.transform(test_data)

In [72]:
# Show results
test_results.show()

+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|class|                text|length|label|          token_text|          stop_token|           count_vec|              tf_idf|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  ham| &lt;#&gt;  mins ...|    51|  0.0|[, &lt;#&gt;, , m...|[, &lt;#&gt;, , m...|(13423,[3,6,41,20...|(13423,[3,6,41,20...|(13424,[3,6,41,20...|[-296.04288785039...|[1.0,1.5372845189...|       0.0|
|  ham| &lt;DECIMAL&gt; ...|   132|  0.0|[, &lt;decimal&gt...|[, &lt;decimal&gt...|(13423,[3,84,115,...|(13423,[3,84,115,...|(13424,[3,84,115,...|[-880.43960646866...|[1.0,7.6178288312...|       0.0|


In [73]:
# Create evaluator object
acc_eval = MulticlassClassificationEvaluator()

In [74]:
# Evaluate
acc = acc_eval.evaluate(test_results)

In [75]:
# Print out the results
print(f'ACC of NB Model: {acc}')

ACC of NB Model: 0.9144625934257442


Nothing bad! We have a model with a 91% accuracy!

This predicts whether a text is a spam message or not with a high acc