In [1]:
import findspark
findspark.init('/home/bowen/spark-2.4.4-bin-hadoop2.7/')
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('nlp_spam_detection').getOrCreate()

In [3]:
df = spark.read.csv('SMSSpamCollection', inferSchema=True, sep='\t')

In [4]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [5]:
df = df.withColumnRenamed('_c0', 'label')

In [6]:
df = df.withColumnRenamed('_c1', 'text')

In [13]:
df.printSchema()

root
 |-- label: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



In [19]:
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover, IDF, StringIndexer, VectorAssembler
from pyspark.sql.functions import length 

In [9]:
df = df.withColumn('length', length(df['text']))

In [11]:
df.groupBy('label').mean().show()

+-----+-----------------+
|label|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [20]:
tokenizer = Tokenizer(inputCol='text', outputCol='token_text')
stop_remover = StopWordsRemover(inputCol='token_text', outputCol='stop_token')
cv = CountVectorizer(inputCol='stop_token', outputCol='count_vec')
idf = IDF(inputCol='count_vec',outputCol='tf_idf')
label_numeric = StringIndexer(inputCol='label', outputCol='label_num')
assembler = VectorAssembler(inputCols=['tf_idf', 'length'], outputCol='features')

In [31]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(labelCol='label_num')

In [24]:
#use pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[label_numeric, tokenizer, stop_remover, cv, idf, assembler])
cleaner = pipeline.fit(df)
final_df = cleaner.transform(df)

In [28]:
dataset = final_df.select(['label_num', 'features'])
dataset.head(1)

[Row(label_num=0.0, features=SparseVector(13424, {7: 3.1126, 11: 3.2055, 31: 3.822, 61: 4.2072, 72: 4.322, 344: 5.4072, 625: 5.918, 731: 6.1411, 1409: 6.6801, 1598: 6.8343, 4485: 7.5274, 6440: 7.9329, 8092: 7.9329, 8838: 7.9329, 11344: 7.9329, 12979: 7.9329, 13423: 111.0}))]

In [29]:
train_set, test_set = dataset.randomSplit([0.8, 0.2])

In [32]:
#use Bayes model
spam_detector = nb.fit(train_set)

In [33]:
results = spam_detector.transform(test_set)

In [46]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator(labelCol='label_num')
acc = acc_eval.evaluate(results)
print ('Bayes: ', acc)

Bayes:  0.9340036966088454


In [47]:
from pyspark.ml.classification import RandomForestClassifier
spam_detector_rtc = RandomForestClassifier(labelCol='label_num')
rtc_model = spam_detector_rtc.fit(train_set)
rtc_results = rtc_model.transform(test_set)
acc_eval = MulticlassClassificationEvaluator(labelCol='label_num')
acc = acc_eval.evaluate(rtc_results)
print ('Random Forests: ', acc)

Random Forests:  0.8330587931687025


In [48]:
from pyspark.ml.classification import LogisticRegression
spam_detector_lgr = LogisticRegression(labelCol='label_num')
lgr_model = spam_detector_lgr.fit(train_set)
lgr_results = lgr_model.transform(test_set)
acc_eval = MulticlassClassificationEvaluator(labelCol='label_num')
acc = acc_eval.evaluate(lgr_results)
print ('Logistic Regression: ', acc)

Logistic Regression:  0.9722194526491276
