# **Spam Filter using Naive Bayes**

In [1]:
#installing pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5b3638ea1ea1b3a221771b93e87bf31cc9271de1ab928bb75f61dc7cb2258edd
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
#creating a spark session
from pyspark.sql import SparkSession
spark=SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *
from pyspark.sql.types import *

## **Reading Data**

In [22]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
#reading the readme file
readme=spark.read.option("sep", "\t").option("header", "False").csv("/content/drive/MyDrive/readme")
#spark.read.text('/content/readme',lineSep='\n')

#showing it
readme.show(n=75,truncate=500)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                

In [6]:
#Creating the schema
schema=StructType([
    StructField('Label',StringType(),True),
    StructField('Text',StringType(),True)
    ])

In [7]:
#reading the data
df_messages = spark.read.option("sep", "\t").option("header", "False").schema(schema).csv("/content/drive/MyDrive/SMSSpamCollection")
df_messages.show()

+-----+--------------------+
|Label|                Text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



### Print the schema

In [8]:
#printing schema
df_messages.printSchema()

root
 |-- Label: string (nullable = true)
 |-- Text: string (nullable = true)



In [9]:
#Showing data
df_messages.show(n=10,truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Label|Text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                 |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075o

## **Data Preparation**

In [10]:
#creating length column to find the length of the message ==> it includes spaces as well as characteres
df_messages_length=df_messages.withColumn('Length',length(df_messages['Text']))

#showing it
df_messages_length.show(truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|Label|Text                                                                                                                                                                                                |Length|
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                                                     |111   |
|ham  |Ok lar... Joking wif u oni...                                                                                                                    

In [11]:
#Average text length for each class
df_messages_length.groupBy('Label').agg(avg('Length').alias('Avg_length')).show()

+-----+-----------------+
|Label|       Avg_length|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+




*   Notably the average text length for spam messages is almost twice that length for ham messages

In [12]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF,StringIndexer,VectorAssembler

In [13]:
#An object to create tokens
tokenizer=Tokenizer(inputCol='Text',outputCol='SplittedWords')

#An object to remove stop words
remover=StopWordsRemover(inputCol='SplittedWords',outputCol='filtered')

#An object to apply count vectorizer
cv = CountVectorizer(inputCol="filtered", outputCol="countvector")

#tf-idf encoding
idf = IDF(inputCol="countvector", outputCol="encoded")

#converting label column to 0s and 1s
strind=StringIndexer(inputCol='Label',outputCol='Class_encoded')


In [15]:
#creating a vector assembler of length and TF-IDF encodings to be used for modelling
vecAssem=VectorAssembler(inputCols=['Length','encoded'],outputCol='features')

## **Model and Pipline**


In [16]:
from pyspark.ml.classification import NaiveBayes

In [17]:
nb=NaiveBayes(featuresCol='features',labelCol='Class_encoded')

In [18]:
#in order to create the pipeline we need to define the stages
stages=[tokenizer,remover,cv,idf,vecAssem,strind,nb]

from pyspark.ml import Pipeline

pl=Pipeline(stages=stages)

In [25]:
#After splitting data to train and test , we save it to a file to make sure it wouldn't change due to using different partitions in different times
train_messages,test_messages=df_messages_length.randomSplit([0.7,0.3],seed=42)
train_path='/content/drive/MyDrive/spam data/Train'
test_path='/content/drive/MyDrive/spam data/Test'

#Run the following two lines only for the first time we train the model==>

# train_messages.write.mode("overwrite").parquet(train_path)
# test_messages.write.mode("overwrite").parquet(test_path)

traindf=spark.read.parquet(train_path,header=True,inferSchema=True)
testdf=spark.read.parquet(test_path,header=True,inferSchema=True)

In [26]:
#fittng the pipline using training data
pl_model=pl.fit(df_messages_length)

#transform to predict both train and test data
pred_train=pl_model.transform(traindf)

pred_test=pl_model.transform(testdf)

In [27]:
#Showing predictions schema
pred_test.printSchema()

root
 |-- Label: string (nullable = true)
 |-- Text: string (nullable = true)
 |-- Length: integer (nullable = true)
 |-- SplittedWords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- countvector: vector (nullable = true)
 |-- encoded: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- Class_encoded: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [28]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [29]:
#showing predictions
pred_test.show()

+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+----------+
|Label|                Text|Length|       SplittedWords|            filtered|         countvector|             encoded|            features|Class_encoded|       rawPrediction|         probability|prediction|
+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+----------+
|  ham| &lt;DECIMAL&gt; ...|   132|[, &lt;decimal&gt...|[, &lt;decimal&gt...|(13423,[3,84,115,...|(13423,[3,84,115,...|(13424,[0,4,85,11...|          0.0|[-832.97860312094...|[1.0,1.4423445091...|       0.0|
|  ham| said kiss, kiss,...|   133|[, said, kiss,, k...|[, said, kiss,, k...|(13423,[3,92,215,...|(13423,[3,92,215,...|(13424,[0,4,93,21...|          0.0|[-983.80575205

In [34]:
#calculating f1-score
f1_score = MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='Class_encoded',metricName='f1')

#for train
print(f"f1_score for train data = {f1_score.evaluate(pred_train)}")

#for test
print(f"f1_score for test data = {f1_score.evaluate(pred_test)}")


f1_score for train data = 0.9964943012425527
f1_score for test data = 0.9943889843628761
