In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('NLP').getOrCreate()

data2 = spark.read.csv('/users/f/desktop/smsspamdata', inferSchema = True, sep = '\t') #As our data is separated by tabs we should read the data with the separation '\t'.

data2.show()



23/01/16 00:06:30 WARN Utils: Your hostname, fs-MacBook-Air-2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.9 instead (on interface en0)
23/01/16 00:06:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/16 00:06:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [5]:
data2 = data2.withColumnRenamed('_c0', 'class').withColumnRenamed('_c1','text')#Rename the headers 

data2.show()

from pyspark.sql.functions import length 

data2 = data2.withColumn('length',length(data2['text']))#We need the lenghth of the data 

data2.show()

data2.groupBy('class').mean().show()#separated by groups (average)

from pyspark.ml.feature import (Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer)

tokenizer = Tokenizer(inputCol = "text", outputCol = "token_text")

stop_remove = StopWordsRemover(inputCol = 'token_text', outputCol="stop_tokens")

count_vec = CountVectorizer(inputCol = 'stop_tokens', outputCol = 'c_vec')

idf = IDF(inputCol = 'c_vec', outputCol = 'tearmf_inversf')

ham_spam_to_numbers = StringIndexer(inputCol = 'class', outputCol = "label")#converts strings to numbers to make the spark understand 

from pyspark.ml.feature import VectorAssembler 

clean_up_data = VectorAssembler(inputCols=['tearmf_inversf', 'length'],outputCol = 'features')

from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes()

from pyspark.ml import Pipeline 

data_prep_pipeline = Pipeline(stages = [ham_spam_to_numbers, tokenizer, stop_remove, count_vec, idf, clean_up_data])

cleaner = data_prep_pipeline.fit(data2)

clean_data2 = cleaner.transform(data2)

clean_data2 = clean_data2.select('label','features')

clean_data2.show()

training_data2, test_data2 = clean_data2.randomSplit([0.7, 0.3])

spam_detection2 = nb.fit(training_data2)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think..

                                                                                

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



                                                                                

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows

23/01/16 00:08:34 WARN DAGScheduler: Broadcasting large task binary with size 1249.2 KiB


[Stage 18:>                                                         (0 + 1) / 1]

23/01/16 00:08:35 WARN DAGScheduler: Broadcasting large task binary with size 1238.9 KiB


                                                                                

In [6]:
spam_detection2 = nb.fit(training_data2)

23/01/16 00:08:42 WARN DAGScheduler: Broadcasting large task binary with size 1249.2 KiB


[Stage 21:>                                                         (0 + 1) / 1]

23/01/16 00:08:43 WARN DAGScheduler: Broadcasting large task binary with size 1238.9 KiB


                                                                                

In [7]:
test_results = spam_detection2.transform(test_data2)

In [8]:
test_results.show()

23/01/16 00:08:50 WARN DAGScheduler: Broadcasting large task binary with size 1471.5 KiB
23/01/16 00:08:50 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,13,...|[-623.85247706018...|[0.99999999969724...|       0.0|
|  0.0|(13424,[0,1,2,41,...|[-1064.9155610426...|[1.0,6.6055065206...|       0.0|
|  0.0|(13424,[0,1,3,9,1...|[-570.96306112847...|[1.0,6.0933697824...|       0.0|
|  0.0|(13424,[0,1,4,50,...|[-829.50716496939...|[1.0,3.9289456697...|       0.0|
|  0.0|(13424,[0,1,5,20,...|[-798.47593807070...|[1.0,5.4311585026...|       0.0|
|  0.0|(13424,[0,1,9,14,...|[-543.90306091825...|[1.0,5.0288793754...|       0.0|
|  0.0|(13424,[0,1,14,18...|[-1358.3710992445...|[1.0,

                                                                                

In [11]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [13]:
acc_eval2 = MulticlassClassificationEvaluator()
acc2 = acc_eval2.evaluate(test_results)
print("Accuracy of model at predicting spam is: {}".format(acc2))

23/01/16 00:09:25 WARN DAGScheduler: Broadcasting large task binary with size 1475.9 KiB
Accuracy of model at predicting spam is: 0.9247471326859881


In [15]:
datax = [('Nah I dont think he goes to usf, he lives around here thoug','NULL')]

In [16]:
datax

[('Nah I dont think he goes to usf, he lives around here thoug', 'NULL')]

In [17]:
columns = ["text","length"]

In [18]:
dfx = spark.createDataFrame(data=datax, schema = columns)

In [19]:
dfx.show()

[Stage 33:>                                                         (0 + 1) / 1]

+--------------------+------+
|                text|length|
+--------------------+------+
|Nah I dont think ...|  NULL|
+--------------------+------+



                                                                                

In [20]:
datax = dfx.withColumn('length',length(dfx['text']))#We need the lenghth of the data

In [21]:
datax.show()

+--------------------+------+
|                text|length|
+--------------------+------+
|Nah I dont think ...|    59|
+--------------------+------+



In [22]:
tokenizer2 = Tokenizer(inputCol = "text", outputCol = "token_text2")

stop_remove2 = StopWordsRemover(inputCol = 'token_text2', outputCol="stop_tokens2")

count_vec2 = CountVectorizer(inputCol = 'stop_tokens2', outputCol = 'c_vec2')

idf2 = IDF(inputCol = 'c_vec2', outputCol = 'tf_idf')

In [23]:
clean_up_datax = VectorAssembler(inputCols=['tf_idf','length'],outputCol = 'features')

In [24]:
data_prep_pipex = Pipeline(stages=[tokenizer2,stop_remove2,count_vec2,idf2,clean_up_datax])

In [25]:
cleanx = data_prep_pipex.fit(datax)

In [26]:
cleandatax = cleanx.transform(datax)

In [27]:
cleandatax.show()

+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------+
|                text|length|         token_text2|        stop_tokens2|              c_vec2|              tf_idf|      features|
+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------+
|Nah I dont think ...|    59|[nah, i, dont, th...|[nah, dont, think...|(8,[0,1,2,3,4,5,6...|(8,[0,1,2,3,4,5,6...|(9,[8],[59.0])|
+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------+



In [28]:
cleandatax2 = cleandatax.select('features')

In [29]:
predictionx = spam_detection2.transform(cleandatax)

In [30]:
predictionx

DataFrame[text: string, length: int, token_text2: array<string>, stop_tokens2: array<string>, c_vec2: vector, tf_idf: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [None]:
predictionx.show()