<a href="https://colab.research.google.com/github/MohamedMostafaSal/UnstructuredProjects/blob/main/Spark/Span%20mails%20Classifier%20using%20Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![image.png](attachment:image.png)

##### **Good luck with taking your exam. Keep working and make your dreams all come true. Seeing the results of all of your hard work will make this struggle worth it. We’re all thinking of you.** 
<b><font color='blue'>AI-PRO Spark Team ITI</font></b>

# NLP Using PySpark

## Objective:
- The objective from this project is to create a <b>Spam filter using NaiveBayes classifier</b>.
- It is required to obtain <b>f1_scored > 0.9</b>.
- We'll use a dataset from UCI Repository. SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection
- Data is also provided for you in the assignment (you do not have to download it).

## To perform this task follow the following guiding steps:

### Create a spark session and import the required libraries

In [None]:
!pwd
! wget https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar -xvzf spark-3.0.1-bin-hadoop3.2.tgz
!pip install findspark

/content
--2022-07-19 08:47:37--  https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 224062525 (214M) [application/x-gzip]
Saving to: ‘spark-3.0.1-bin-hadoop3.2.tgz.2’

                 sp   6%[>                   ]  13.48M  1.56MB/s    eta 2m 12s ^C
spark-3.0.1-bin-hadoop3.2/
spark-3.0.1-bin-hadoop3.2/RELEASE
spark-3.0.1-bin-hadoop3.2/examples/
spark-3.0.1-bin-hadoop3.2/examples/src/
spark-3.0.1-bin-hadoop3.2/examples/src/main/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/

In [None]:
import os
import findspark

os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
findspark.init()

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn

In [None]:
spark = SparkSession.builder.appName('FinalExam').getOrCreate()

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

### Read the readme file to learn more about the data

### Read the data into a DataFrame

In [None]:
df = spark.read.load("SMSSpamCollection", format="csv", sep="\t", inferSchema="true")

### Print the schema

In [None]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



### Rename the first column to 'class' and second column to 'text'

In [None]:
df2 = df.withColumnRenamed('_c0', "class")
df3 = df2.withColumnRenamed('_c1', "text")

In [None]:
df3.show(5)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
+-----+--------------------+
only showing top 5 rows



### Show the first 10 rows from the dataframe
- Show once with truncate=True and once with truncate=False

In [None]:
df3.show(10)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



In [None]:
df3.show(10, truncate = False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                 |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075o

## Clean and Prepare the Data

### Create a new feature column contains the length of the text column

In [None]:
df4 = df3.withColumn('textLength', fn.length(df3.text))

df4.show(10)

+-----+--------------------+----------+
|class|                text|textLength|
+-----+--------------------+----------+
|  ham|Go until jurong p...|       111|
|  ham|Ok lar... Joking ...|        29|
| spam|Free entry in 2 a...|       155|
|  ham|U dun say so earl...|        49|
|  ham|Nah I don't think...|        61|
| spam|FreeMsg Hey there...|       147|
|  ham|Even my brother i...|        77|
|  ham|As per your reque...|       160|
| spam|WINNER!! As a val...|       157|
| spam|Had your mobile 1...|       154|
+-----+--------------------+----------+
only showing top 10 rows



### Show the new dataframe

In [None]:
df4.show(5)

+-----+--------------------+----------+
|class|                text|textLength|
+-----+--------------------+----------+
|  ham|Go until jurong p...|       111|
|  ham|Ok lar... Joking ...|        29|
| spam|Free entry in 2 a...|       155|
|  ham|U dun say so earl...|        49|
|  ham|Nah I don't think...|        61|
+-----+--------------------+----------+
only showing top 5 rows



### Get the average text length for each class (give alias name to the average length column)

In [None]:
df4.groupBy('class').agg(fn.avg(df4.textLength).alias('Avg. Length')).show()

+-----+-----------------+
|class|      Avg. Length|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### In this part you transform you raw text in to tf_idf model :
- For more information about TF-IDF check the following link: <b>(Not needed for the test)</b>
https://en.wikipedia.org/wiki/Tf%E2%80%93idf

### Perform the following steps to obtain TF-IDF:
1. Import the required transformers/estimators for the subsequent steps.
2. Create a <b>Tokenizer</b> from the text column.
3. Create a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
4. Create a <b>CountVectorizer</b> after removing the <b>stop words</b>.
5. Create the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [None]:
import nltk
nltk.download('punkt')
from nltk import word_tokenize
text ="please help me ignore punctuation like . or , but at the same time don't ignore if it looks like a url i.e. google.com or google.co.uk. Sometimes I also want conditions where I see an equals sign between words such as myname=shecode"
#word_tokenize(text)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [None]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(outputCol="words")
tokenizer.setInputCol("text")
tk = tokenizer.transform(df4)
tk.show(5, truncate=False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                       |textLength|words                                                                                                                                                                                   |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.ml.feature import StopWordsRemover 
remover = StopWordsRemover(stopWords=["b"])
remover.setInputCol("text")

StopWordsRemover_ea629538a1ed

In [None]:
remover = StopWordsRemover(inputCol='words', outputCol='words_clean')
sw = remover.transform(tk)
sw.show(10, truncate = False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |textLength|words                                                                                                                                                                                      |words_clean                                                                                        

In [None]:
from pyspark.ml.feature import CountVectorizer
countVectorizer = CountVectorizer(inputCol = "words_clean" , outputCol="vectors")
#cv.setOutputCol("vectors")
#cv.setInputCol("words_clean")
cv = countVectorizer.fit(sw).transform(sw)
cv.show(10, truncate = False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |textLength|words                                                                                                    

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
idf = IDF(inputCol="vectors", outputCol="tf_idf")
id = idf.fit(cv).transform(cv)
id.show(10, truncate = False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

- Convert the <b>class column</b> to index using <b>StringIndexer</b>
- Create feature column from the <b>TF-IDF</b> and <b>lenght</b> columns.

In [None]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="class", outputCol="label", stringOrderType="frequencyDesc")
stringIndexer.setHandleInvalid("error")
si = stringIndexer.fit(id).transform(id)
si.show(10)

+-----+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-----+
|class|                text|textLength|               words|         words_clean|             vectors|              tf_idf|label|
+-----+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-----+
|  ham|Go until jurong p...|       111|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|  0.0|
|  ham|Ok lar... Joking ...|        29|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(13423,[0,24,297,...|  0.0|
| spam|Free entry in 2 a...|       155|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|  1.0|
|  ham|U dun say so earl...|        49|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,80,1...|(13423,[0,70,80,1...|  0.0|
|  ham|Nah I don't think...|        61|[nah, i, don't, t...|[nah, think, goes...|(13423,[3

In [None]:
#input -> tf_idf, textLength
#Label -> indexed
from pyspark.ml.feature import VectorAssembler
vector_assembler = VectorAssembler(inputCols=['tf_idf', 'textLength'], outputCol='features')
vector_assembler.transform(si).show(10)

+-----+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+
|class|                text|textLength|               words|         words_clean|             vectors|              tf_idf|label|            features|
+-----+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+
|  ham|Go until jurong p...|       111|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|  0.0|(13424,[7,11,31,6...|
|  ham|Ok lar... Joking ...|        29|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(13423,[0,24,297,...|  0.0|(13424,[0,24,297,...|
| spam|Free entry in 2 a...|       155|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|  1.0|(13424,[2,13,19,3...|
|  ham|U dun say so earl...|        49|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,

## The Model
- Create a <b>NaiveBayes</b> classifier with the default parameters.

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()

## Pipeline
### Create a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [None]:
from pyspark.ml import Pipeline
pipy = Pipeline(stages=[stringIndexer, tokenizer, remover, countVectorizer, idf,  vector_assembler, nb])
#pipy = Pipeline().setStages(stringIndexer + [tokenizer, remover, countVectorizer, idf, vector_assembler, nb])

### Split your data to trian and test data with ratios 0.7 and 0.3 respectively.

In [None]:
train_df, test_df = df4.randomSplit([.7,.3],seed=42)
print(f"There are {train_df.count()} rows in the training set, and {test_df.count()} in the test set")

There are 3981 rows in the training set, and 1593 in the test set


In [None]:
train_df.write.parquet("train.parquet",  mode = 'overwrite')
test_df.write.parquet("test.parquet",  mode = 'overwrite')

In [None]:
train_df = spark.read.parquet('train.parquet')
test_df = spark.read.parquet('test.parquet')

In [None]:
train_df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- textLength: integer (nullable = true)



In [None]:
test_df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- textLength: integer (nullable = true)



### Fit your Pipeline model to the training data

In [None]:
model=pipy.fit(train_df)
model

PipelineModel_7150b1fecdfd

In [None]:
model.transform(test_df).select('label', 'prediction').show(10)

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 10 rows



### Perform predictions on tests dataframe

In [None]:
pred = model.transform(test_df)

### Print the schema of the prediction dataframe

In [None]:
pred.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- textLength: integer (nullable = true)
 |-- label: double (nullable = false)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vectors: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- Use <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(pred)
print("Accuracy of model at predicting spam was: {}".format(acc))

Accuracy of model at predicting spam was: 0.9727502290227267


# GOOD LUCK
<b><font color='GREEN'>AI-PRO Spark Team ITI</font></b>

![image-3.png](attachment:image-3.png)