In [1]:
df = sqlContext.sql("select * from 09_smsspamcollection_8f651")
df = df.withColumnRenamed('flag','class').withColumnRenamed('message','text')
df.printSchema()

In [2]:
from pyspark.sql.functions import length
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
df = df.withColumn('length',length(df['text']))
df.groupby('class').mean().show()

In [4]:
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
df_clean = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [5]:
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,df_clean])
cleaner = data_prep_pipe.fit(df)
clean_data = cleaner.transform(df)
clean_data = clean_data.select(['label','features'])

In [6]:
(training,testing) = clean_data.randomSplit([0.7,0.3])

In [7]:
nb = NaiveBayes()
spam_predictor = nb.fit(training)

In [8]:
test_results = spam_predictor.transform(testing)
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))