In [1]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [3]:
# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [4]:
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [5]:
# install findspark using pip
!pip install -q findspark

In [6]:
import findspark

In [7]:
findspark.init()
from pyspark.sql import SparkSession

In [8]:
spark = SparkSession.builder.appName('SMSSpamCollection').getOrCreate()

In [15]:
df = spark.read.options(inferSchema='True',delimiter='\t').csv("SMSSpamCollection.csv")
# df = spark.read.options(inferSchema='True', ).csv("spam.csv")

## Rename the columns
df = df.withColumnRenamed("_c0", "class").withColumnRenamed("_c1", "text")
# df = df.withColumnRenamed("_v1", "class").withColumnRenamed("_v2", "text")
df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



### Create a new length feature (new column w/ the length of the text)

In [17]:
import pyspark.sql.functions as F

df = df.withColumn("textLength", F.length("text"))
df.show()

+-----+--------------------+----------+
|class|                text|textLength|
+-----+--------------------+----------+
|  ham|Go until jurong p...|       111|
|  ham|Ok lar... Joking ...|        29|
| spam|Free entry in 2 a...|       155|
|  ham|U dun say so earl...|        49|
|  ham|Nah I don't think...|        61|
| spam|FreeMsg Hey there...|       147|
|  ham|Even my brother i...|        77|
|  ham|As per your reque...|       160|
| spam|WINNER!! As a val...|       157|
| spam|Had your mobile 1...|       154|
|  ham|I'm gonna be home...|       109|
| spam|SIX chances to wi...|       136|
| spam|URGENT! You have ...|       155|
|  ham|I've been searchi...|       196|
|  ham|I HAVE A DATE ON ...|        35|
| spam|XXXMobileMovieClu...|       149|
|  ham|Oh k...i'm watchi...|        26|
|  ham|Eh u remember how...|        81|
|  ham|Fine if thats th...|        56|
| spam|England v Macedon...|       155|
+-----+--------------------+----------+
only showing top 20 rows



### What do you notice ?

#### Visual inspection shows that the average length of the spam messages is longer.

In [18]:
## We can confirm this below
df.groupby('class').mean().show()

+-----+-----------------+
|class|  avg(textLength)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



### Create feature transformers

In [19]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, NGram
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


### Use Pipeline to create a data pre-processing pipeline as follows 

In [20]:
## Use VectorAssembler to create an assembler of tf_idf feature with length 
## (column should be called ‘features’)

## Change to True to use NGRAM
use_ngram = False

# Indexing class column to a numeric label (output column should be called ‘label’)
data_to_num = StringIndexer(inputCol='class', outputCol='label')

# Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")

# # • Ngrams (try with and without ngrams ; n=2)
if use_ngram:
  ngram = NGram(n=2, inputCol = "token_text", outputCol="ngrams")
  # • Count vertorization
  count_vec = CountVectorizer(inputCol='ngrams', outputCol='count_vec_stop')

  # • Stop words removal
  # stop_remove = StopWordsRemover(inputCol='ngrams', outputCol='stop_tokens')
else:
  # • Stop words removal
  stop_remove = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
  # • Count vertorization
  count_vec = CountVectorizer(inputCol='stop_tokens', outputCol='count_vec_stop')


# • IDF
idf = IDF(inputCol="count_vec_stop", outputCol="tf_idf")

# • Vector assembling
vec_assembler = VectorAssembler(inputCols=['tf_idf', 'textLength'], outputCol='features')

### Transform the data DataFrame through the pipeline (last column should be called ‘features’)

In [21]:
from pyspark.ml import Pipeline

spam_pipe = Pipeline(stages=[data_to_num, tokenizer, stop_remove, count_vec, idf, vec_assembler])
spam_cleaner = spam_pipe.fit(df)
spam_data_clean = spam_cleaner.transform(df)

#### Import NaiveBayse model from pyspark.ml.classification

In [22]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol='features', labelCol='label')

In [23]:
# Create a training DataFrame by selection the “label” and “features” column

In [24]:
spam_data_clean = spam_data_clean.select(['label','features'])
spam_data_clean.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [25]:
## Use random split to split the training data
(train_data, test_data) = spam_data_clean.randomSplit([0.8, 0.2])

## Train your model (fit method)
predictor = nb.fit(train_data)


In [26]:
## Apply your model to test data
test_prediction = predictor.transform(test_data)
test_prediction.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,7,8...|[-797.10741000107...|[1.0,2.3171984914...|       0.0|
|  0.0|(13424,[0,1,4,50,...|[-830.27854819841...|[1.0,1.0080865887...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[-884.32728572833...|[1.0,4.1959570815...|       0.0|
|  0.0|(13424,[0,1,14,31...|[-216.63456340940...|[1.0,7.1110318817...|       0.0|
|  0.0|(13424,[0,1,14,78...|[-687.99565244781...|[1.0,9.5171867055...|       0.0|
|  0.0|(13424,[0,1,15,20...|[-670.92985171347...|[1.0,7.7938013580...|       0.0|
|  0.0|(13424,[0,1,21,27...|[-751.11958677349...|[1.0,1.6237466631...|       0.0|
|  0.0|(13424,[0,1,27,88...|[-1532.9239280363...|[0.34398256936550...|       1.0|
|  0.0|(13424,[0,1,498,5...|[-320.05306471727...|[0.99999999999835...|       0.0|
|  0.0|(13424,[0

### Evaluate accuracy using MulticlassClassificationEvaluator from pyspark.ml.evaluation

In [27]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

mcc_eval_acc = MulticlassClassificationEvaluator()
model_acc = mcc_eval_acc.evaluate(test_prediction)

print("Model Accuracy: {:.2f}".format(model_acc))

Model Accuracy: 0.94


## Results
#### Model Accuracy was 92% without NGrams
#### Model Accuracy was 62% with NGrams
