<a href="https://colab.research.google.com/github/Anas-Rabea/ITI/blob/main/Spark_and_Python_for_Big_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![image.png](attachment:image.png)

##### **Good luck with taking your exam. Keep working and make your dreams all come true. Seeing the results of all of your hard work will make this struggle worth it. We’re all thinking of you.** 
<b><font color='blue'>AI-PRO Spark Team ITI</font></b>

# NLP Using PySpark

## Objective:
- The objective from this project is to create a <b>Spam filter using NaiveBayes classifier</b>.
- It is required to obtain <b>f1_scored > 0.9</b>.
- We'll use a dataset from UCI Repository. SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection
- Data is also provided for you in the assignment (you do not have to download it).

## To perform this task follow the following guiding steps:

### Create a spark session and import the required libraries

In [None]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

!pip install pyspark

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 48 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 49.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=d6dfe007775fbf0d1519cc08e8a048151efe01c0594c2897e43f75b6461928fd
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


### Read the readme file to learn more about the data

### Read the data into a DataFrame

In [None]:
df = spark.read.csv("/content/SMSSpamCollection" , sep='\t', inferSchema=True)


In [None]:
df.show(5)

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
+----+--------------------+
only showing top 5 rows



### Print the schema

In [None]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



### Rename the first column to 'class' and second column to 'text'

In [None]:
df.columns

['_c0', '_c1']

In [None]:
df = df.withColumnRenamed('_c0',"class")
df = df.withColumnRenamed('_c1',"text")
df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



### Show the first 10 rows from the dataframe
- Show once with truncate=True and once with truncate=False

In [None]:
df.show(10) #it's True bydefault

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



In [None]:
df.show(10, truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                 |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075o

## Clean and Prepare the Data

### Create a new feature column contains the length of the text column

In [None]:
from pyspark.sql.functions import *
df = df.withColumn("length",length(df['text']))

### Show the new dataframe

In [None]:
df.show(5)

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
+-----+--------------------+------+
only showing top 5 rows



### Get the average text length for each class (give alias name to the average length column)

In [None]:
df.groupBy("class") \
  .agg(avg("length").alias("Avg. Lenght")).show()

+-----+-----------------+
|class|      Avg. Lenght|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### In this part you transform you raw text in to tf_idf model :
- For more information about TF-IDF check the following link: <b>(Not needed for the test)</b>
https://en.wikipedia.org/wiki/Tf%E2%80%93idf

### Perform the following steps to obtain TF-IDF:
1. Import the required transformers/estimators for the subsequent steps.
2. Create a <b>Tokenizer</b> from the text column.
3. Create a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
4. Create a <b>CountVectorizer</b> after removing the <b>stop words</b>.
5. Create the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [None]:
from pyspark.ml.feature import RegexTokenizer
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern=" ")
tokenized = tokenizer.transform(df)

In [None]:
tokenized.show(5)


+-----+--------------------+------+--------------------+
|class|                text|length|               words|
+-----+--------------------+------+--------------------+
|  ham|Go until jurong p...|   111|[go, until, juron...|
|  ham|Ok lar... Joking ...|    29|[ok, lar..., joki...|
| spam|Free entry in 2 a...|   155|[free, entry, in,...|
|  ham|U dun say so earl...|    49|[u, dun, say, so,...|
|  ham|Nah I don't think...|    61|[nah, i, don't, t...|
+-----+--------------------+------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover 
from nltk.stem.snowball import SnowballStemmer


remover = StopWordsRemover(inputCol='words', outputCol='words_clean')
df_nostop_words = remover.transform(tokenized)

In [None]:
df_nostop_words.show(5)

+-----+--------------------+------+--------------------+--------------------+
|class|                text|length|               words|         words_clean|
+-----+--------------------+------+--------------------+--------------------+
|  ham|Go until jurong p...|   111|[go, until, juron...|[go, jurong, poin...|
|  ham|Ok lar... Joking ...|    29|[ok, lar..., joki...|[ok, lar..., joki...|
| spam|Free entry in 2 a...|   155|[free, entry, in,...|[free, entry, 2, ...|
|  ham|U dun say so earl...|    49|[u, dun, say, so,...|[u, dun, say, ear...|
|  ham|Nah I don't think...|    61|[nah, i, don't, t...|[nah, think, goes...|
+-----+--------------------+------+--------------------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import CountVectorizer
import pyspark.ml.feature
count_vect = CountVectorizer(inputCol = "words_clean" , outputCol="count_vect")
model = count_vect.fit(df_nostop_words)

In [None]:
from pyspark.ml.feature import IDF, Tokenizer
tf_idf = IDF(inputCol="count_vect", outputCol="tf_idf")

- Convert the <b>class column</b> to index using <b>StringIndexer</b>
- Create feature column from the <b>TF-IDF</b> and <b>lenght</b> columns.

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
# string_idx = StringIndexer(inputCol='class',outputCol='class_index')
string_idx = StringIndexer(inputCol='class',outputCol='label')

vec_assembler = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')


## The Model
- Create a <b>NaiveBayes</b> classifier with the default parameters.

In [None]:
from pyspark.ml.classification import NaiveBayes
# Use defaults
clf = NaiveBayes(labelCol='label')

## Pipeline
### Create a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [None]:
mystage = [string_idx , tokenizer , remover , model , tf_idf , vec_assembler]

In [None]:
from pyspark.ml import Pipeline 
pipe = Pipeline(stages=mystage)
cleaner = pipe.fit(df)

In [None]:
clean_data = cleaner.transform(df)
clean_data = clean_data.select(['label','features'])


In [None]:
clean_data.show(5)


+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13423,[6,10,30,6...|
|  0.0|(13423,[0,23,296,...|
|  1.0|(13423,[2,12,18,2...|
|  0.0|(13423,[0,69,79,1...|
|  0.0|(13423,[35,131,31...|
+-----+--------------------+
only showing top 5 rows



### Split your data to trian and test data with ratios 0.7 and 0.3 respectively.

In [None]:
(training,testing) = clean_data.randomSplit([0.7,0.3])


In [None]:
training.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13423,[0,1,2,6,7...|
|  0.0|(13423,[0,1,2,40,...|
|  0.0|(13423,[0,1,4,14,...|
|  0.0|(13423,[0,1,4,19,...|
|  0.0|(13423,[0,1,6,7,1...|
+-----+--------------------+
only showing top 5 rows



### Fit your Pipeline model to the training data

In [None]:
spam_predictor = clf.fit(training)


### Perform predictions on tests dataframe

In [None]:
test_results = spam_predictor.transform(testing)
test_results.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13423,[0,1,2,12,...|[-604.84669918667...|[1.0,1.0827898161...|       0.0|
|  0.0|(13423,[0,1,3,49,...|[-846.85657711868...|[1.0,3.0918626624...|       0.0|
|  0.0|(13423,[0,1,8,134...|[-551.27119275508...|[0.99999962795326...|       0.0|
|  0.0|(13423,[0,1,20,26...|[-752.78372417465...|[1.0,3.1677785172...|       0.0|
|  0.0|(13423,[0,1,26,34...|[-1467.3422208330...|[0.99999999999999...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



### Print the schema of the prediction dataframe

In [None]:
test_results.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- label: double (nullable = false)
 |-- token_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- c_vec: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- Use <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print(acc)

0.9239386208076041


In [None]:
# predictionAndLabels = testing.map(lambda lp: (float(spam_predictor.predict(lp.features)), lp.label))
# from pyspark.mllib.evaluation import MulticlassMetrics
# metrics = MulticlassMetrics(test_results)


In [None]:
test_results.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13423,[0,1,2,12,...|[-604.84669918667...|[1.0,1.0827898161...|       0.0|
|  0.0|(13423,[0,1,3,49,...|[-846.85657711868...|[1.0,3.0918626624...|       0.0|
|  0.0|(13423,[0,1,8,134...|[-551.27119275508...|[0.99999962795326...|       0.0|
|  0.0|(13423,[0,1,20,26...|[-752.78372417465...|[1.0,3.1677785172...|       0.0|
|  0.0|(13423,[0,1,26,34...|[-1467.3422208330...|[0.99999999999999...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [None]:
from sklearn.metrics import f1_score
print("sklearn F1 Score evaluator : ", f1_score(test_results.select('label').toPandas(), test_results.select('prediction').toPandas()))
# I don't know why f1 is low but above got high score and default metric is f1


sklearn F1 Score evaluator :  0.7434944237918216


f1_score is: 0.9664707489549014


# GOOD LUCK
<b><font color='GREEN'>AI-PRO Spark Team ITI</font></b>

![image-3.png](attachment:image-3.png)