In [1]:
#import findspark
#findspark.init('/home/nahle/spark-2.1.0-bin-hadoop2.7')
#import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NN").getOrCreate()

## Getting the data

In [3]:
data = sc.textFile('/FileStore/tables/SMSSpamCollection')
mydata=data.map(lambda x: x.split('\t')).map(lambda y: (y[0], y[1]))
data=mydata.toDF(['label','message'])

### We transform the input ham/spam in 0/1 class with StringIndexer

In [5]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import CountVectorizer
stringIndexer = StringIndexer(inputCol="label", outputCol="categoryIndex")
string_indexor = stringIndexer.fit(data)
data = string_indexor.transform(data)

In [6]:
regex_tokenizer = RegexTokenizer(inputCol='message', outputCol='words', pattern='\\W') #Separate 
regex_df = regex_tokenizer.transform(data)
final_data=regex_df.select("categoryIndex","words").withColumnRenamed("categoryIndex","label")
count_words = udf(lambda words: len(words), IntegerType())
regex_tokenized_counts = regex_df.withColumn('freq', count_words('words'))
regex_tokenized_counts.show()
final_data.show(45,truncate=False)

### We filter some words with StopWordsRemover

In [8]:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol='words', outputCol='tokens')
tokens_filtered = remover.transform(regex_tokenized_counts)
cleanDF= tokens_filtered.withColumn('count_tokens', count_words('tokens'))
count_vec = CountVectorizer(inputCol='tokens', outputCol='features',  minDF=1)
model = count_vec.fit(cleanDF)
data = model.transform(cleanDF)
cleanDF.show(truncate=True)

###Rename columns

In [10]:
from pyspark.sql.functions import *
data.withColumnRenamed("label", "type").withColumnRenamed("categoryIndex", "label")

#Logistic regression

In [12]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import RegressionMetrics

lr = LogisticRegression(featuresCol='features', labelCol='label')
modelData= data.selectExpr('features', 'categoryIndex as label') 

In [13]:
trainData, testData = modelData.randomSplit([0.7, 0.3])
lrModel = lr.fit(trainData) #Fit the model
trainResults=lrModel.evaluate(trainData)  #evaluation on the training set

summary = lrModel.summary  # MODEL summary for lrModel
train_res=summary.predictions.select("label","prediction")
train_res.show(n=5)  # first predictions

train_rdd=train_res.rdd
metrics = BinaryClassificationMetrics(train_rdd)
metrics2 = RegressionMetrics(train_rdd)
print("R2 score ={}:" .format(metrics2.r2)) #R2 score


R2 score of 100% which indicates that the model explains all the variability of the response data around its mean

In [15]:
testResults = lrModel.evaluate(testData) #Use the model on our test data
test_res=testResults.predictions.select("label","prediction")
test_res.show(10) #Show difference between prediction and reality


In [16]:
test_rdd=test_res.rdd
metricss = BinaryClassificationMetrics(test_rdd)
metricss2 = RegressionMetrics(test_rdd)
print(metricss2.r2) 

R2 score is lower than before but it's because it was 100% on our train set, so maybe there is a little bit an overfitting.

#Neural Network

In [19]:
#Training a Neural Network
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

layers = [8623, 5, 4, 2]
classifier = MultilayerPerceptronClassifier(maxIter=100,
                                           layers= layers,
                                           blockSize = 128)
model = classifier.fit(trainData)
preds = model.transform(testData)
preds.show(20)

In [20]:
predictionsAndLabels = preds.select(['prediction', 'label'])
predictionsAndLabels.show()

In [21]:
#Classification metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_NN = MulticlassClassificationEvaluator(metricName='f1')
evaluator_NN.explainParam('metricName')
evaluation_NN = evaluator_NN.evaluate(predictionsAndLabels)
print ('NeuralNetwork F1 = %g'%evaluation_NN)

#Naive Bayes

In [23]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model_Naive = nb.fit(trainData)

predictions_Naive = model_Naive.transform(testData)
predictions_Naive.show(5)

evaluator_Naive = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")

accuracy = evaluator_Naive.evaluate(predictions_Naive)
print("Test set accuracy = " + str(accuracy))