In [1]:
#run the code on Databricks cluster
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()
data = spark.read.csv("FileStore/tables/SMSSpamCollection",inferSchema=True,sep='\t')
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')
data.show()


In [2]:
from pyspark.sql.functions import length
data = data.withColumn('length',length(data['text']))
data.show()


In [3]:
# Pretty Clear Difference
data.groupby('class').mean().show()
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')

In [4]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')


In [5]:
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)
clean_data = clean_data.select(['label','features'])
clean_data.show()
(training,testing) = clean_data.randomSplit([0.7,0.3])


In [6]:
from pyspark.ml.classification import  DecisionTreeClassifier
dct = DecisionTreeClassifier()
spam_predictor = dct.fit(training)
test_results = spam_predictor.transform(testing)
test_results.show()

In [7]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of DecisionTreeClassifier model at predicting spam was: {}".format(acc))