###### Here I'm using findspark to indicate where the spark module is located

In [1]:
import findspark
findspark.init(r'C:\Users\q1011812\Downloads\spark-3.0.1-bin-hadoop3.2\spark-3.0.1-bin-hadoop3.2')

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [3]:
from pyspark.sql.functions import col, udf, length
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import (Tokenizer,
                                RegexTokenizer,
                                HashingTF,
                                IDF, 
                                StringIndexer,
                                CountVectorizer,
                                StopWordsRemover,
                                VectorAssembler)

In [4]:
dataset = spark.read.csv('smsspamcollection/SMSSpamCollection', 
                         inferSchema = True, sep = '\t')
dataset.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [5]:
dataset.show(1)#,truncate = False)

+---+--------------------+
|_c0|                 _c1|
+---+--------------------+
|ham|Go until jurong p...|
+---+--------------------+
only showing top 1 row



In [6]:
dataset = dataset.withColumnRenamed('_c0', 'Class').withColumnRenamed('_c1', 'Message')

In [7]:
dataset = dataset.withColumn('length', length(dataset['Message']))

In [8]:
dataset.groupBy('Class').mean().show()

+-----+-----------------+
|Class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



# Tokenizer

In [9]:
tokenizer = Tokenizer(inputCol = 'Message', outputCol = 'token')
stop_words = StopWordsRemover(inputCol = 'token', outputCol = 'stopToken')
count_vec = CountVectorizer(inputCol = 'stopToken', outputCol = 'c_vec')
idf = IDF(inputCol = 'c_vec', outputCol = 'tf-idf')
ham_spam = StringIndexer(inputCol = 'Class', outputCol ='label')

In [10]:
assembler = VectorAssembler(inputCols = ['label', 'length'], outputCol = 'features')

In [11]:
from pyspark.ml.classification import NaiveBayes

In [12]:
nb = NaiveBayes()

In [13]:
from pyspark.ml import Pipeline

In [14]:
data_pipe_prep = Pipeline(stages = [ham_spam,
                                    tokenizer,
                                   stop_words,
                                   count_vec,
                                   idf,
                                   assembler])

In [15]:
cleaner = data_pipe_prep.fit(dataset)
cleaned_data = cleaner.transform(dataset)

In [16]:
dataset = cleaned_data.select(['label', 'features'])

In [17]:
training_data, test_data = dataset.randomSplit([0.8,0.2])

In [18]:
classifier = nb.fit(training_data)

In [19]:
predictions = classifier.transform(test_data)

In [20]:
predictions.show(4)

+-----+---------+--------------------+--------------------+----------+
|label| features|       rawPrediction|         probability|prediction|
+-----+---------+--------------------+--------------------+----------+
|  0.0|[0.0,2.0]|[-0.1465519639127...|[0.86537655796558...|       0.0|
|  0.0|[0.0,3.0]|[-0.1465556110254...|[0.86621461190048...|       0.0|
|  0.0|[0.0,3.0]|[-0.1465556110254...|[0.86621461190048...|       0.0|
|  0.0|[0.0,5.0]|[-0.1465629052507...|[0.86787748652361...|       0.0|
+-----+---------+--------------------+--------------------+----------+
only showing top 4 rows



In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [22]:
acc_eval = MulticlassClassificationEvaluator()
accuracy = acc_eval.evaluate(predictions)

In [23]:
print(accuracy)

1.0
