# NLP Using PySpark

## Objective:
- The objective from this project is to create a <b>Spam filter using NaiveBayes classifier</b>.

- We'll use a dataset from UCI Repository. SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection


### Creating a spark session and importing the required libraries

In [276]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import  Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, StringIndexer, VectorAssembler, HashingTF, IDF

In [277]:
spark = SparkSession.builder.getOrCreate()
spark

### Reading the data into a DataFrame

In [278]:
df = spark.read.csv('SMSSpamCollection',sep="	")

In [279]:
#printing the schema
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [280]:
#Renaming the first column to 'class' and second column to 'text'
df_renamed = df.withColumnRenamed("_c0", "class") \
         .withColumnRenamed("_c1","text")

In [281]:
df_renamed.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



In [282]:
df_renamed.show(10 ,truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                 |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075o

## Cleaning and Preparing the Data




In [283]:
### Creating a new feature column contains the length of the text column
df_with_length = df_renamed.withColumn("Name_Length", length("text"))

In [284]:
df_with_length.show(5)

+-----+--------------------+-----------+
|class|                text|Name_Length|
+-----+--------------------+-----------+
|  ham|Go until jurong p...|        111|
|  ham|Ok lar... Joking ...|         29|
| spam|Free entry in 2 a...|        155|
|  ham|U dun say so earl...|         49|
|  ham|Nah I don't think...|         61|
+-----+--------------------+-----------+
only showing top 5 rows



In [285]:
#calculating the avg lenth to each class
df_avg = df_with_length.groupby('class').avg()
df_avg.show()

+-----+-----------------+
|class| avg(Name_Length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### Performing the following steps to obtain TF-IDF:

1. Creating a <b>Tokenizer</b> from the text column.
2. Creating a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
3. Creating a <b>CountVectorizer</b> after removing the <b>stop words</b>.
4. Creating the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [286]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
stopword_remover = StopWordsRemover(inputCol="words", outputCol="FilteredText")
cv = CountVectorizer(inputCol="FilteredText", outputCol="TF")
idf = IDF(inputCol="TF", outputCol="TFIDF")

#### then performing the following:

  - Converting the <b>class column</b> to index using <b>StringIndexer</b>
  - Creating feature column from the <b>TF-IDF</b> and <b>lenght</b> columns.

In [287]:
string_indexer = StringIndexer(inputCol="class", outputCol="label")
vectorAssm = VectorAssembler(inputCols=['label','TFIDF'] , outputCol='features')

## The Model
- Creating a <b>NaiveBayes</b> classifier

In [288]:
naive_clf = NaiveBayes()

## Pipeline
### Creating a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [289]:
Pl = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, string_indexer, vectorAssm, naive_clf])

### Splitting the data to trian and test data with ratios 0.7 and 0.3 respectively.

In [290]:
traindf, testdf = df_with_length.randomSplit( [.7, .3],seed=42)
traindf.count(), testdf.count()

(3981, 1593)

### Fitting the Pipeline model to the training data

In [291]:
fitted_pipline = Pl.fit(traindf)

###  predicting on tests dataframe

In [292]:
 predictions_df = fitted_pipline.transform(testdf)

In [293]:
#predictions_df schema
predictions_df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- Name_Length: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- FilteredText: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- TF: vector (nullable = true)
 |-- TFIDF: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- calculating the <b>f1_score</b> using <b>MulticlassClassificationEvaluator</b>

In [294]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")


In [295]:
evaluator.evaluate(predictions_df)

0.9728509810000032