# NLP Using PySpark - Spam Detection

## Objective:
- The objective from this project is to create a <b>Spam filter using NaiveBayes classifier</b>.
- It is required to obtain <b>f1_scored > 0.9</b>.
- We'll use a dataset from UCI Repository. SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection

## To perform this task follow the following guiding steps:

### Create a spark session and import the required libraries

In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn

spark = SparkSession.builder.getOrCreate()

### Read the readme file to learn more about the data

### Read the data into a DataFrame

In [2]:
df = spark.read.csv("SMSSpamCollection.csv", header=False, sep='\t')

### Print the schema

In [3]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



### Rename the first column to 'class' and second column to 'text'

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, FloatType

df = df.withColumnRenamed("_c0","class").withColumnRenamed("_c1","text")
df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



In [5]:
df.show(10)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



## Clean and Prepare the Data

### Create a new feature column contains the length of the text column

In [6]:
def calc_len(txt):
    return len(txt)

In [7]:
lenUDF = fn.udf(calc_len, IntegerType())

In [8]:
df1 = df.withColumn('length', lenUDF('text'))

In [9]:
# OR
df2 = df.withColumn('length', fn.length(df.text))

### Show the new dataframe

In [10]:
df1.show(10)

[Stage 2:>                                                          (0 + 1) / 1]

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
+-----+--------------------+------+
only showing top 10 rows



                                                                                

In [11]:
df2.show(10)

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
+-----+--------------------+------+
only showing top 10 rows



### Get the average text length for each class (give alias name to the average length column)

In [12]:
df1.groupBy('class').agg(fn.avg('length').alias('Avg. Length')).show()

+-----+-----------------+
|class|      Avg. Length|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### In this part you transform you raw text in to tf_idf model :
- For more information about TF-IDF check the following link:
https://en.wikipedia.org/wiki/Tf%E2%80%93idf

### Perform the following steps to obtain TF-IDF:
1. Import the required transformers/estimators for the subsequent steps.
2. Create a <b>Tokenizer</b> from the text column.
3. Create a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
4. Create a <b>CountVectorizer</b> after removing the <b>stop words</b>.
5. Create the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [13]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, VectorAssembler

In [14]:
# text tokenization
tokenizer = Tokenizer()

tokenizer.setInputCol('text')
tokenizer.setOutputCol("Tokenized_words")

Tokenizer_8bd12eefc5c4

In [15]:
# Removing stop words
remover = StopWordsRemover()
stopwords = remover.getStopWords() 

# Display default list
print(stopwords[:10])

remover.setInputCol("Tokenized_words")
remover.setOutputCol("Tokenized_words_no_stop_words")

['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your']


StopWordsRemover_8f2f49756de5

In [16]:
# Count vectorization
cv = CountVectorizer()
cv.setInputCol("Tokenized_words_no_stop_words")
cv.setOutputCol("vectors")

CountVectorizer_482fa74caf1d

In [17]:
# calculate scores
idf = IDF()
idf.setInputCol('vectors')
idf.setOutputCol('tfidf_features')

IDF_5ab72570b9c8

- Convert the <b>class column</b> to index using <b>StringIndexer</b>
- Create feature column from the <b>TF-IDF</b> and <b>lenght</b> columns.

In [18]:
stringIndexer = StringIndexer(inputCols=['class'],
                              outputCols=['class_index'],
                              handleInvalid='skip')

In [19]:
vecAssembler = VectorAssembler(inputCols=['tfidf_features', 'length'], outputCol='features')

## The Model
- Create a <b>NaiveBayes</b> classifier with the default parameters.

In [20]:
from pyspark.ml.classification import NaiveBayes

In [21]:
model = NaiveBayes(featuresCol='features',
                   labelCol='class_index',
                   predictionCol='prediction')

## Pipeline
### Create a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [22]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, stringIndexer, vecAssembler, model])

### Split data to trian and test data with ratios 0.7 and 0.3 respectively.

In [23]:
train_df, test_df = df1.randomSplit([.7,.3],seed=42)

### Fit Pipeline model to the training data

In [24]:
pipeline = pipeline.fit(train_df)

22/07/26 13:41:29 WARN DAGScheduler: Broadcasting large task binary with size 1044.6 KiB
22/07/26 13:41:30 WARN DAGScheduler: Broadcasting large task binary with size 1026.4 KiB
                                                                                

### Perform predictions on tests dataframe

In [25]:
pred = pipeline.transform(test_df)

### Print the schema of the prediction dataframe

In [26]:
pred.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- Tokenized_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Tokenized_words_no_stop_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vectors: vector (nullable = true)
 |-- tfidf_features: vector (nullable = true)
 |-- class_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- Use <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [27]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [28]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='class_index')

In [29]:
print(f'F1-score = {evaluator.evaluate(pred)}')

22/07/26 13:41:31 WARN DAGScheduler: Broadcasting large task binary with size 1229.4 KiB


F1-score = 0.9727502290227267
