# NLP Using PySpark

## Objective:
- The objective from this project is to create a <b>Spam filter using NaiveBayes classifier</b>.
- It is required to obtain <b>f1_scored > 0.9</b>.
- We'll use a dataset from UCI Repository. SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection
- Data is also provided for you in the assignment (you do not have to download it).

## To perform this task follow the following guiding steps:

### Create a spark session and import the required libraries

In [13]:
# Install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# Unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# Set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# Install findspark using pip
!pip install -q findspark

# Spark for Python
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 59.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=b3b17ca0776427c4eba6dfc2ccbad5fe3d0664a727c117578f9a485f56429d33
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [14]:
import findspark
findspark.init()
import pyspark

In [15]:
from pyspark.sql import SparkSession 


In [16]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sp = spark.sparkContext

### Read the readme file to learn more about the data

### Read the data into a DataFrame

In [137]:
df = spark.read.text("/content/SMSSpamCollection")
df.show()

+--------------------+
|               value|
+--------------------+
|ham	Go until juro...|
|ham	Ok lar... Jok...|
|spam	Free entry i...|
|ham	U dun say so ...|
|ham	Nah I don't t...|
|spam	FreeMsg Hey ...|
|ham	Even my broth...|
|ham	As per your r...|
|spam	WINNER!! As ...|
|spam	Had your mob...|
|ham	I'm gonna be ...|
|spam	SIX chances ...|
|spam	URGENT! You ...|
|ham	I've been sea...|
|ham	I HAVE A DATE...|
|spam	XXXMobileMov...|
|ham	Oh k...i'm wa...|
|ham	Eh u remember...|
|ham	Fine if that...|
|spam	England v Ma...|
+--------------------+
only showing top 20 rows



### Print the schema

In [61]:
df.printSchema()


root
 |-- value: string (nullable = true)



### Rename the first column to 'class' and second column to 'text'

In [62]:
from pyspark.sql import functions as f

In [138]:
df = df.withColumn('class', f.split(df['value'], '	')[0])
df = df.withColumn('text', f.split(df['value'], '	')[1])
df = df.drop("value")

In [141]:
df.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



### Show the first 10 rows from the dataframe
- Show once with truncate=True and once with truncate=False

In [66]:
df.show(10)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



In [68]:
df.show(10,False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                 |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075o

## Clean and Prepare the Data

### Create a new feature column contains the length of the text column

In [71]:
df = df.withColumn('length', f.length('text') )

### Show the new dataframe

In [72]:
df.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



### Get the average text length for each class (give alias name to the average length column)

In [73]:
df.groupby("class").avg("length").show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.47192873420344|
| spam|138.6760374832664|
+-----+-----------------+



## Feature Transformations

### In this part you transform you raw text in to tf_idf model :
- For more information about TF-IDF check the following link: <b>(Not needed for the test)</b>
https://en.wikipedia.org/wiki/Tf%E2%80%93idf

### Perform the following steps to obtain TF-IDF:
1. Import the required transformers/estimators for the subsequent steps.
2. Create a <b>Tokenizer</b> from the text column.
3. Create a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
4. Create a <b>CountVectorizer</b> after removing the <b>stop words</b>.
5. Create the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [103]:
from pyspark.ml.feature import Tokenizer,  StopWordsRemover, CountVectorizer, IDF, StringIndexer


In [142]:
indexer = StringIndexer(inputCol="class", outputCol="label").fit(df)
df_ind = indexer.transform(df).select("label","text")
df_ind.show()

+-----+--------------------+
|label|                text|
+-----+--------------------+
|  0.0|Go until jurong p...|
|  0.0|Ok lar... Joking ...|
|  1.0|Free entry in 2 a...|
|  0.0|U dun say so earl...|
|  0.0|Nah I don't think...|
|  1.0|FreeMsg Hey there...|
|  0.0|Even my brother i...|
|  0.0|As per your reque...|
|  1.0|WINNER!! As a val...|
|  1.0|Had your mobile 1...|
|  0.0|I'm gonna be home...|
|  1.0|SIX chances to wi...|
|  1.0|URGENT! You have ...|
|  0.0|I've been searchi...|
|  0.0|I HAVE A DATE ON ...|
|  1.0|XXXMobileMovieClu...|
|  0.0|Oh k...i'm watchi...|
|  0.0|Eh u remember how...|
|  0.0|Fine if thats th...|
|  1.0|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [143]:
tokenizer = Tokenizer(inputCol='text', outputCol='words_token')
df_words_token = tokenizer.transform(df_ind).select('label', 'words_token')
df_words_token.show()

+-----+--------------------+
|label|         words_token|
+-----+--------------------+
|  0.0|[go, until, juron...|
|  0.0|[ok, lar..., joki...|
|  1.0|[free, entry, in,...|
|  0.0|[u, dun, say, so,...|
|  0.0|[nah, i, don't, t...|
|  1.0|[freemsg, hey, th...|
|  0.0|[even, my, brothe...|
|  0.0|[as, per, your, r...|
|  1.0|[winner!!, as, a,...|
|  1.0|[had, your, mobil...|
|  0.0|[i'm, gonna, be, ...|
|  1.0|[six, chances, to...|
|  1.0|[urgent!, you, ha...|
|  0.0|[i've, been, sear...|
|  0.0|[i, have, a, date...|
|  1.0|[xxxmobilemoviecl...|
|  0.0|[oh, k...i'm, wat...|
|  0.0|[eh, u, remember,...|
|  0.0|[fine, if, thats...|
|  1.0|[england, v, mace...|
+-----+--------------------+
only showing top 20 rows



In [144]:
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
df_clean = remover.transform(df_words_token).select('label', 'words_clean')
df_clean.show()


+-----+--------------------+
|label|         words_clean|
+-----+--------------------+
|  0.0|[go, jurong, poin...|
|  0.0|[ok, lar..., joki...|
|  1.0|[free, entry, 2, ...|
|  0.0|[u, dun, say, ear...|
|  0.0|[nah, think, goes...|
|  1.0|[freemsg, hey, da...|
|  0.0|[even, brother, l...|
|  0.0|[per, request, 'm...|
|  1.0|[winner!!, valued...|
|  1.0|[mobile, 11, mont...|
|  0.0|[gonna, home, soo...|
|  1.0|[six, chances, wi...|
|  1.0|[urgent!, won, 1,...|
|  0.0|[searching, right...|
|  0.0|[date, sunday, wi...|
|  1.0|[xxxmobilemoviecl...|
|  0.0|[oh, k...i'm, wat...|
|  0.0|[eh, u, remember,...|
|  0.0|[fine, thats, wa...|
|  1.0|[england, v, mace...|
+-----+--------------------+
only showing top 20 rows



In [148]:
cv = CountVectorizer(inputCol="words_clean", outputCol="vectors").fit(df_clean)
df_vectorived = cv.transform(df_clean).select('label', 'vectors')
df_vectorived.show()

+-----+--------------------+
|label|             vectors|
+-----+--------------------+
|  0.0|(13464,[7,11,31,6...|
|  0.0|(13464,[0,24,296,...|
|  1.0|(13464,[2,13,19,3...|
|  0.0|(13464,[0,69,80,1...|
|  0.0|(13464,[36,134,31...|
|  1.0|(13464,[10,67,139...|
|  0.0|(13464,[10,53,103...|
|  0.0|(13464,[125,184,4...|
|  1.0|(13464,[1,46,118,...|
|  1.0|(13464,[0,1,13,27...|
|  0.0|(13464,[18,43,120...|
|  1.0|(13464,[8,17,37,8...|
|  1.0|(13464,[13,30,46,...|
|  0.0|(13464,[39,95,217...|
|  0.0|(13464,[552,1690,...|
|  1.0|(13464,[30,109,11...|
|  0.0|(13464,[82,214,37...|
|  0.0|(13464,[0,2,49,13...|
|  0.0|(13464,[0,74,105,...|
|  1.0|(13464,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [150]:
idf = IDF(inputCol="vectors", outputCol="features")
idfModel = idf.fit(df_vectorived)
rescaledData = idfModel.transform(df_vectorived).select("label","features")
rescaledData.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13464,[7,11,31,6...|
|  0.0|(13464,[0,24,296,...|
|  1.0|(13464,[2,13,19,3...|
|  0.0|(13464,[0,69,80,1...|
|  0.0|(13464,[36,134,31...|
|  1.0|(13464,[10,67,139...|
|  0.0|(13464,[10,53,103...|
|  0.0|(13464,[125,184,4...|
|  1.0|(13464,[1,46,118,...|
|  1.0|(13464,[0,1,13,27...|
|  0.0|(13464,[18,43,120...|
|  1.0|(13464,[8,17,37,8...|
|  1.0|(13464,[13,30,46,...|
|  0.0|(13464,[39,95,217...|
|  0.0|(13464,[552,1690,...|
|  1.0|(13464,[30,109,11...|
|  0.0|(13464,[82,214,37...|
|  0.0|(13464,[0,2,49,13...|
|  0.0|(13464,[0,74,105,...|
|  1.0|(13464,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



## The Model
- Create a <b>NaiveBayes</b> classifier with the default parameters.

In [105]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [106]:
nb = NaiveBayes()

## Pipeline
### Create a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [107]:
from pyspark.ml import Pipeline


In [151]:
pipeline = Pipeline(stages=[tokenizer,remover, cv, idf,nb])


### Split your data to trian and test data with ratios 0.7 and 0.3 respectively.

In [152]:
seed = 0
trainDF, testDF = df_ind.randomSplit([0.7,0.3],seed)

### Fit your Pipeline model to the training data

In [153]:
model = pipeline.fit(trainDF)

### Perform predictions on tests dataframe

In [154]:
prediction = model.transform(testDF)

### Print the schema of the prediction dataframe

In [156]:
prediction.printSchema()

root
 |-- label: double (nullable = false)
 |-- text: string (nullable = true)
 |-- words_token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vectors: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- Use <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [None]:
my_eval_lr.evaluate(predictions_lr)


In [157]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [158]:
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")


In [161]:
f1 = evaluatorMulti.evaluate(prediction, {evaluatorMulti.metricName: "f1"})
evaluatorMulti.evaluate(prediction)

0.9105008637125755