In [10]:
import os
import re
import csv

import numpy as np
import pandas as pd

In [11]:
#import dataset
#give the proper csv file path
dataset = pd.read_csv("1spam.csv", encoding="latin-1")
dataset

Unnamed: 0,text,label
0,"Go until jurong point, crazy.. Available only ...",ham
1,Ok lar... Joking wif u oni...,ham
2,Free entry in 2 a wkly comp to win FA Cup fina...,spam
3,U dun say so early hor... U c already then say...,ham
4,"Nah I don't think he goes to usf, he lives aro...",ham
5,FreeMsg Hey there darling it's been 3 week's n...,spam
6,Even my brother is not like to speak with me. ...,ham
7,As per your request 'Melle Melle (Oru Minnamin...,ham
8,WINNER!! As a valued network customer you have...,spam
9,Had your mobile 11 months or more? U R entitle...,spam


In [12]:
# instance per label
dataset.groupby('label').label.count()

label
ham     4825
spam     747
Name: label, dtype: int64

In [13]:
# fix random seed for reproducibility
np.random.seed(7)

from pyspark import SparkFiles
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import col 
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.mllib.classification import *

sc = SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)

In [14]:
# Pre-Processng - cleaning and converting the data them into the right format

# Remove Non-Alphanumeric Characters
for row in range(len(dataset)):
    line = dataset.iloc[row,0]
    dataset.iloc[row,0] = re.sub("[^a-zA-Z0-9]", " ", line)
    
# Convert to Lowercase & SQL Dataframe
dataset_lower = dataset.apply(lambda x: x.astype(str).str.lower())

# Convert panda dataframe to sql dataframe
sql_dataset = sqlCtx.createDataFrame(dataset_lower)

# Tokenize Text into Words
Tokenizer = RegexTokenizer(inputCol="text",outputCol="words", pattern="\\W")

# countTokens = udf(lambda words: len(words), IntegerType())
regexTokenized = Tokenizer.transform(sql_dataset).select("words","label")

# Remove Stopwords
remove = StopWordsRemover(inputCol="words", outputCol="cleaned")
removed = remove.transform(regexTokenized).select("cleaned","label")

#  Stemming of words
# Conver sql dataframe back to pandas dataframe 
from nltk.stem import SnowballStemmer

stemming_removed = removed.toPandas()
stemmer = SnowballStemmer('english')
stemming_removed["words"] = stemming_removed["cleaned"].apply(lambda x: [stemmer.stem(y) for y in x])

# Convert back to sql dataframe
stemmed_dataset = sqlCtx.createDataFrame(stemming_removed).select(["label","words"])

# map the data to numeric values
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

indexer = StringIndexer(inputCol = "label", outputCol = "bin_label")
indexed = indexer.fit(stemmed_dataset).transform(stemmed_dataset)

# Split Data to Training and Test Set
# Split data into training(50%) and test(50%)
train, test = indexed.randomSplit([0.5, 0.5], seed=50)
train.cache()
test.cache()

DataFrame[label: string, words: array<string>, bin_label: double]

In [15]:
# stemmed dataset
stemmed_dataset.show(5)
indexed.groupBy("bin_label","label").count().orderBy('bin_label').show()

# datasets details
print("Number of Training Samples: "+ str(train.count()))
print("Number of Test Samples: "+ str(test.count()))



+-----+--------------------+
|label|               words|
+-----+--------------------+
|  ham|[go, jurong, poin...|
|  ham|[ok, lar, joke, w...|
| spam|[free, entri, 2, ...|
|  ham|[u, dun, say, ear...|
|  ham|[nah, think, goe,...|
+-----+--------------------+
only showing top 5 rows

+---------+-----+-----+
|bin_label|label|count|
+---------+-----+-----+
|      0.0|  ham| 4825|
|      1.0| spam|  747|
+---------+-----+-----+

Number of Training Samples: 2759
Number of Test Samples: 2813


In [16]:
# apply Term Frequency, Inverse Document Frequecy Method to determine importance of a word 

tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures = 10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=2)

train_tf = tf.transform(train)
test_tf = tf.transform(test)

idfModel = idf.fit(train_tf)
tfidf_train = idfModel.transform(train_tf).select("bin_label", "features")

In [17]:
from pyspark.ml.linalg import DenseVector

rescaled_train = tfidf_train.rdd.map(lambda lp: (lp.bin_label, DenseVector(lp.features.toArray()))) \
                                     .toDF(['label','features'])
rescaled_test = test_tf.rdd.map(lambda lp: (lp.bin_label, DenseVector(lp.rawFeatures.toArray()))) \
                                .toDF(['label','features'])

In [18]:
from pyspark.ml.classification import NaiveBayes
from pyspark.mllib.evaluation import MulticlassMetrics

# naive bayes classifier is used for the text classifiction
nb = NaiveBayes(smoothing=0.01, modelType="multinomial")
model = nb.fit(rescaled_train)

test_prob = model.transform(rescaled_test)
test_pred = test_prob.select('prediction','label').rdd.map(lambda lp: (lp.prediction,lp.label))
test_metrics = MulticlassMetrics(test_pred)

train_prob = model.transform(rescaled_train)
train_pred = train_prob.select('prediction','label').rdd.map(lambda lp: (lp.prediction,lp.label))
train_metrics = MulticlassMetrics(train_pred)
print("Training Accuracy: " + str(train_metrics.accuracy*100)+ " %" )

print("Testing Accuracy: " + str(test_metrics.accuracy*100)+ " %" )


Training Accuracy: 98.80391446176151 %
Testing Accuracy: 97.0494134376111 %
