In [60]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col,udf,length
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

In [13]:
spark = SparkSession.builder.appName('NLP').getOrCreate()

In [34]:
data = spark.read.csv('/home/sai/ex/ML/nlp/sms/SMSSpamCollection', 
                      inferSchema=True, sep='\t')

In [35]:
data.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [36]:
data = data.withColumnRenamed('_c0', 'class').withColumnRenamed('_c1','text')

In [37]:
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [39]:
#get length of the text
data = data.withColumn('length', length(data['text']))

In [40]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [44]:
token =  Tokenizer(inputCol='text', outputCol='words')
stop = StopWordsRemover(inputCol='words', outputCol='s_words')
cv = CountVectorizer(inputCol='s_words', outputCol='cv_words')
idf = IDF(inputCol='cv_words', outputCol='tf_idf')
class_index = StringIndexer(inputCol='class',outputCol='label')

In [46]:
final_data = VectorAssembler(inputCols=['tf_idf', 'length'], outputCol='features')

In [48]:
cls = NaiveBayes()

In [52]:
pipeline = Pipeline(stages=[token, stop, cv, idf, class_index, final_data])

In [53]:
inp_df = pipeline.fit(data).transform(data)

In [54]:
inp_df.columns

['class',
 'text',
 'length',
 'words',
 's_words',
 'cv_words',
 'tf_idf',
 'label',
 'features']

In [55]:
input_ = inp_df.select('label', 'features')

In [56]:
tr_data, ts_data = input_.randomSplit([0.8, 0.2])

In [57]:
model = cls.fit(tr_data)

In [58]:
pred = model.transform(ts_data)

In [59]:
pred.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13459,[0,1,3,10,...|[-607.48993633988...|[1.0,1.7710771654...|       0.0|
|  0.0|(13459,[0,1,5,15,...|[-765.06328026590...|[1.0,4.0837720904...|       0.0|
|  0.0|(13459,[0,1,8,16,...|[-661.57453413789...|[1.0,1.5417836669...|       0.0|
|  0.0|(13459,[0,1,15,19...|[-1369.6027972019...|[1.0,4.5056936840...|       0.0|
|  0.0|(13459,[0,1,15,33...|[-217.33520256923...|[1.0,1.0159779333...|       0.0|
|  0.0|(13459,[0,1,19,21...|[-840.30321427917...|[1.0,6.8125096391...|       0.0|
|  0.0|(13459,[0,1,439,6...|[-303.53976094608...|[0.99999999999999...|       0.0|
|  0.0|(13459,[0,1,510,5...|[-324.79249627414...|[0.99999999996801...|       0.0|
|  0.0|(13459,[0,1,896,1...|[-97.898273123397...|[0.99999998379716...|       0.0|
|  0.0|(13459,[0

In [61]:
eval_met = MulticlassClassificationEvaluator(metricName='accuracy')

In [63]:
print('accuracy of model is', eval_met.evaluate(pred))

accuracy of model is 0.9071883530482256
