In [233]:
!pip install pyspark



# NLP Using PySpark

## Objective:
- The objective from this project is to create a <b>Spam filter using NaiveBayes classifier</b>.
- It is required to obtain <b>f1_scored > 0.9</b>.
- We'll use a dataset from UCI Repository. SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection
- Data is also provided for you in the assignment (you do not have to download it).

## To perform this task follow the following guiding steps:

### Create a spark session and import the required libraries

In [234]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

### Read the readme file to learn more about the data

### Read the data into a DataFrame

In [235]:
totalDF = spark.read.format("csv")\
                .option("header", "false")\
                .option("delimiter", "\t")\
                .option("inferSchema", "true").load('/content/SMSSpamCollection')

In [236]:
totalDF.show(5)

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
+----+--------------------+
only showing top 5 rows



### Print the schema

In [237]:
totalDF.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



### Rename the first column to 'class' and second column to 'text'

In [238]:
totalDF = totalDF.withColumnRenamed("_c0", "class")
totalDF = totalDF.withColumnRenamed("_c1", "text")

totalDF.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



### Show the first 10 rows from the dataframe
- Show once with truncate=True and once with truncate=False

In [239]:
totalDF.show(10)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



In [240]:
totalDF.show(10, truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                 |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075o

## Clean and Prepare the Data

### Create a new feature column contains the length of the text column

In [241]:
from pyspark.sql.functions import length
totalDF = totalDF.withColumn('length', length(totalDF.text))

### Show the new dataframe

In [242]:
totalDF.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



### Get the average text length for each class (give alias name to the average length column)

In [243]:
from pyspark.sql.functions import avg

lengthDF = totalDF.groupBy('class').agg(avg('length').alias("Avg. Length"))
lengthDF.show()

+-----+-----------------+
|class|      Avg. Length|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### In this part you transform you raw text in to tf_idf model :
- For more information about TF-IDF check the following link: <b>(Not needed for the test)</b>
https://en.wikipedia.org/wiki/Tf%E2%80%93idf

### Perform the following steps to obtain TF-IDF:
1. Import the required transformers/estimators for the subsequent steps.
2. Create a <b>Tokenizer</b> from the text column.
3. Create a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
4. Create a <b>CountVectorizer</b> after removing the <b>stop words</b>.
5. Create the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [244]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")


- Convert the <b>class column</b> to index using <b>StringIndexer</b>
- Create feature column from the <b>TF-IDF</b> and <b>lenght</b> columns.

In [245]:
from pyspark.ml.feature import StringIndexer
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')

In [246]:
from pyspark.ml.feature import VectorAssembler
clean = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

## The Model
- Create a <b>NaiveBayes</b> classifier with the default parameters.

In [247]:
from pyspark.ml.classification import NaiveBayes
clf = NaiveBayes()

## Pipeline
### Create a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [248]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[ham_spam_to_num, tokenizer, stopremove, count_vec, idf, clean, clf])

### Split your data to trian and test data with ratios 0.7 and 0.3 respectively.

In [249]:
trainDF, testDF = totalDF.randomSplit([0.7, 0.3])

### Fit your Pipeline model to the training data

In [250]:
spamPred = pipeline.fit(trainDF)

### Perform predictions on tests dataframe

In [251]:
predictions = spamPred.transform(testDF)

In [252]:
predictions.show()

+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|class|                text|length|label|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  ham| &lt;DECIMAL&gt; ...|   132|  0.0|[, &lt;decimal&gt...|[, &lt;decimal&gt...|(10805,[3,92,108,...|(10805,[3,92,108,...|(10806,[3,92,108,...|[-779.06627846623...|[1.0,1.9791742347...|       0.0|
|  ham|"Response" is one...|   154|  0.0|["response", is, ...|["response", one,...|(10805,[2,7,27,61...|(10805,[2,7,27,61...|(10806,[2,7,27,61...|[-764.02824100286...|[1.0,1.4416368212...|       0.0|


### Print the schema of the prediction dataframe

In [253]:
predictions.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- label: double (nullable = false)
 |-- token_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- c_vec: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- Use <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [254]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [255]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(predictions)
print("F1-Score of the model at predicting spam was: {}".format(acc))

F1-Score of the model at predicting spam was: 0.9724817278921944


# GOOD LUCK
