In [1]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

In [36]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Tokenizer, Word2Vec
from tqdm import tqdm
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix
from pyspark.ml.classification import NaiveBayes

In [3]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext

# Load relevant objects
sc = SparkContext('local')
log_txt = sc.textFile("spam.txt")

In [4]:
spark = SparkSession(sc)

In [5]:
header = 'class,text'


In [6]:
log_txt = log_txt.filter(lambda line: line != header)

temp_var = log_txt.map(lambda k: k.split("\t"))

In [7]:
log_df=temp_var.toDF(header.split(","))
log_df.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [8]:
log_df.columns

['class', 'text']

In [9]:
log_df.dropna()

DataFrame[class: string, text: string]

In [10]:
log_df.count()

5574

In [11]:
log_df = log_df.withColumn(log_df.columns[0], when(col(log_df.columns[0]) == "spam", 1).otherwise(0))
log_df.show(5)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|    0|Go until jurong p...|
|    0|Ok lar... Joking ...|
|    1|Free entry in 2 a...|
|    0|U dun say so earl...|
|    0|Nah I don't think...|
+-----+--------------------+
only showing top 5 rows



In [12]:
log_df.filter(log_df[log_df.columns[0]] == 1).select(log_df.columns[0]).count()

747

In [13]:
747/log_df.count() *100

13.40150699677072

As we see only the spam and ham are present in the dataset. but the dataset is not balanced. so we need to balance the dataset.
by using imbalanced-learn library we can do that later but for now we will use the following code.

In [14]:
tokenizer = Tokenizer(inputCol=log_df.columns[1], outputCol="words")
wordsData = tokenizer.transform(log_df)

In [15]:
wordsData.show(5)

+-----+--------------------+--------------------+
|class|                text|               words|
+-----+--------------------+--------------------+
|    0|Go until jurong p...|[go, until, juron...|
|    0|Ok lar... Joking ...|[ok, lar..., joki...|
|    1|Free entry in 2 a...|[free, entry, in,...|
|    0|U dun say so earl...|[u, dun, say, so,...|
|    0|Nah I don't think...|[nah, i, don't, t...|
+-----+--------------------+--------------------+
only showing top 5 rows



In [19]:
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="result")
model = word2Vec.fit(wordsData)

result = model.transform(wordsData)
# for row in tqdm(result.collect()):
#     text, vector = row
#     print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))


In [24]:
def word2Vector(text):
    return model.getVectors(text)

In [20]:
result.columns

['class', 'text', 'words', 'result']

In [22]:
result.select('result').show(5)

+--------------------+
|              result|
+--------------------+
|[2.35824286937713...|
|[-0.0904866193110...|
|[-0.1330838936846...|
|[-0.1861538268964...|
|[-0.2342496712047...|
+--------------------+
only showing top 5 rows



In [25]:
train_df,test_df = result.select(log_df.columns[0], 'result').randomSplit([0.8,0.2], seed=140)

In [38]:
NB = NaiveBayes(featuresCol = 'result', labelCol = log_df.columns[0],  modelType="gaussian")
NB_model = NB.fit(train_df)
predictions = NB_model.transform(test_df)

In [39]:
predictions.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|class|              result|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|[-0.6811982262879...|[-8.9007451315644...|[1.0,2.4436754384...|       0.0|
|    0|[-0.6442804962396...|[-7.3109603886818...|[0.99999999999999...|       0.0|
|    0|[-0.5820461862853...|[-10.463172004258...|[1.0,1.5437837586...|       0.0|
|    0|[-0.5557772426732...|[-0.4076377662523...|[0.99999999990504...|       0.0|
|    0|[-0.5451446071267...|[1.25950925190698...|[0.99999999700923...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [43]:
RFpredictB = predictions[predictions.prediction==0].toPandas()
RFpredictM = predictions[predictions.prediction==1].toPandas()

result = pd.concat([RFpredictM,RFpredictB])
true_labels=(test_df.select("class")).toPandas()
predicted_labels=result["prediction"]

print("-- naive bayes --")
print("------------------------------------------------------------------------")
print("Classification Report\n",classification_report(true_labels, predicted_labels))
print("------------------------------------------------------------------------")
print("Confusion matrix\n",confusion_matrix(true_labels,predicted_labels),"\n\n")
nbc=confusion_matrix(true_labels,predicted_labels)

-- naive bayes --
------------------------------------------------------------------------
Classification Report
               precision    recall  f1-score   support

           0       0.87      0.88      0.87      1028
           1       0.00      0.00      0.00       141

    accuracy                           0.78      1169
   macro avg       0.43      0.44      0.44      1169
weighted avg       0.76      0.78      0.77      1169

------------------------------------------------------------------------
Confusion matrix
 [[908 120]
 [141   0]] 




In the confusion matrix, the first row is the true negative, the second row is the true positive, the third row is the false positive and the fourth row is the false negative.
we can see that spam is not classified correctly.