# NLP Using PySpark

## Objective:
- The objective from this project is to create a <b>Spam filter using NaiveBayes classifier</b>.
- It is required to obtain <b>f1_scored > 0.9</b>.
- We'll use a dataset from UCI Repository. SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection
- Data is also provided for you in the assignment (you do not have to download it).

## To perform this task follow the following guiding steps:

### Create a spark session and import the required libraries

In [1]:
!pip install pyspark
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip
!apt install unzip
!unzip /content/smsspamcollection.zip 
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 49 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 66.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=4e6b7407c851c2837f1fe5dde368d2527ff060c56588e4d9a3f0ba014576401c
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
--2022-07-24 20:46:38--  https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip
Resolvin

### Read the readme file to learn more about the data

### Read the data into a DataFrame

In [15]:
sampleDF = spark.read.csv('/content/SMSSpamCollection',inferSchema=True,samplingRatio=0.001)

In [16]:
myschema = sampleDF.schema
myschema

StructType([StructField('_c0', StringType(), True), StructField('_c1', StringType(), True)])

In [17]:
df = spark.read.csv('/content/SMSSpamCollection',header=True,schema=myschema,sep='\t')

### Print the schema

In [18]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



### Rename the first column to 'class' and second column to 'text'

In [19]:
df1 = df.withColumnRenamed("_c0",'class')\
        .withColumnRenamed("_c1",'text')
df1.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



### Show the first 10 rows from the dataframe
- Show once with truncate=True and once with truncate=False

In [20]:
df1.show(10)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
+-----+--------------------+
only showing top 10 rows



In [22]:
df1.show(10,truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's     |
|ham  |U dun say so early hor... U c already then say...                                                                                                   

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                 |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075o

## Clean and Prepare the Data

### Create a new feature column contains the length of the text column

In [30]:
from pyspark.sql.functions import length
import pyspark.sql.functions as fn
from pyspark.sql.functions import *


In [28]:
df2=df1.withColumn("length",length(df1.text))

### Show the new dataframe

In [29]:
df2.show(10)

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
+-----+--------------------+------+
only showing top 10 rows



+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



### Get the average text length for each class (give alias name to the average length column)

In [31]:
df2.createOrReplaceTempView('table')

In [41]:
spark.sql('''
SELECT class,AVG(length) as AvgLenght
from table
GROUP BY class

''').show()

+-----+-----------------+
|class|        AvgLenght|
+-----+-----------------+
|  ham|71.44612515540821|
| spam|138.6706827309237|
+-----+-----------------+



+-----+-----------------+
|class|      Avg. Lenght|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### In this part you transform you raw text in to tf_idf model :
- For more information about TF-IDF check the following link: <b>(Not needed for the test)</b>
https://en.wikipedia.org/wiki/Tf%E2%80%93idf

### Perform the following steps to obtain TF-IDF:
1. Import the required transformers/estimators for the subsequent steps.
2. Create a <b>Tokenizer</b> from the text column.
3. Create a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
4. Create a <b>CountVectorizer</b> after removing the <b>stop words</b>.
5. Create the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [74]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,HashingTF, IDF,StringIndexer,VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline


In [94]:
from typing import Pattern
tokenizer=Tokenizer(inputCol="text",outputCol="textTokenized")
stopWordsRemover=StopWordsRemover(inputCol="textTokenized",outputCol="CleanWords")
countVectorizer=CountVectorizer(inputCol="CleanWords",outputCol="Vectors")
# hashingTF=HashingTF(inputCol="Vectors", outputCol="rawfeatures")
iDF=IDF(inputCol="Vectors", outputCol="TfIDFFeatures")

In [96]:
a=tokenizer.transform(df2)
a=stopWordsRemover.transform(a)
aa=countVectorizer.fit(a)
z=aa.transform(a)
zz=iDF.fit(z)
zz.transform(z).show(truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

- Convert the <b>class column</b> to index using <b>StringIndexer</b>
- Create feature column from the <b>TF-IDF</b> and <b>lenght</b> columns.

In [105]:
stringIndexer=StringIndexer(inputCol="class",outputCol="label")
vectorAssembler=VectorAssembler(inputCols=["TfIDFFeatures","length"],outputCol="features")

## The Model
- Create a <b>NaiveBayes</b> classifier with the default parameters.

In [106]:
nb = NaiveBayes()


## Pipeline
### Create a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [107]:
myStage=[tokenizer,stopWordsRemover,countVectorizer,iDF,stringIndexer,vectorAssembler,nb]

In [108]:
pl = Pipeline(stages=myStage)

### Split your data to trian and test data with ratios 0.7 and 0.3 respectively.

In [111]:
trainDF, testDF = df2.randomSplit([0.7, 0.3], seed=42)


### Fit your Pipeline model to the training data

In [112]:
pipelineModel= pl.fit(trainDF)

### Perform predictions on tests dataframe

In [113]:
predDF = pipelineModel.transform(testDF)

### Print the schema of the prediction dataframe

In [114]:
predDF.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- textTokenized: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- CleanWords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Vectors: vector (nullable = true)
 |-- TfIDFFeatures: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- label: double (nullable = false)
 |-- token_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- c_vec: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [115]:
predDF.select('text','prediction').show()

+--------------------+----------+
|                text|prediction|
+--------------------+----------+
| &lt;DECIMAL&gt; ...|       0.0|
| said kiss, kiss,...|       0.0|
| what number do u...|       0.0|
|"Gimme a few" was...|       0.0|
|"Response" is one...|       0.0|
|"SYMPTOMS" when U...|       0.0|
|"Speak only when ...|       0.0|
|&lt;#&gt;  great ...|       0.0|
|&lt;#&gt;  w jett...|       0.0|
|&lt;#&gt; , that'...|       0.0|
|&lt;#&gt; ISH MIN...|       0.0|
|(I should add tha...|       0.0|
|(No promises on w...|       0.0|
|(That said can yo...|       0.0|
|* Am on a train b...|       0.0|
|* Thought I didn'...|       0.0|
|* Was a nice day ...|       0.0|
|* Will have two m...|       0.0|
|, ,  and  picking...|       0.0|
|, how's things? J...|       0.0|
+--------------------+----------+
only showing top 20 rows



In [116]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

## Model Evaluation
- Use <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [121]:
f1_score = MulticlassClassificationEvaluator(metricName='f1')

In [122]:
f1_score.evaluate(predDF)

0.97498871245162