# NLP Using PySpark

### Create a spark session and import the required libraries

In [3]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = (SparkSession.builder.getOrCreate())

### Read the readme file to learn more about the data

### Read the data into a DataFrame

In [3]:
df = spark.read.csv("/content/SMSSpamCollection.csv",   sep='\t')

### Print the schema

In [4]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



### Rename the first column to 'class' and second column to 'text'

In [5]:
df = df.withColumnRenamed("_c0","class")
df = df.withColumnRenamed("_c1","text")

In [6]:
df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



### Show the first 10 rows from the dataframe
- Show once with truncate=True and once with truncate=False

In [7]:
df.show(10)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



In [8]:
df.show(10,truncate=True)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



In [9]:
df.show(10,truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                 |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075o

## Clean and Prepare the Data

### Create a new feature column contains the length of the text column

In [10]:
from pyspark.sql.functions import col,length,avg

In [11]:
df = df.withColumn("length" , length(col("text")))

In [12]:
df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



### Show the new dataframe

In [13]:
df.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



### Get the average text length for each class (give alias name to the average length column)

In [14]:
df.groupBy('class').agg(avg('length').alias(' Avg. Length')).show()

+-----+-----------------+
|class|      Avg. Length|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### Perform the following steps to obtain TF-IDF:
1. Import the required transformers/estimators for the subsequent steps.
2. Create a <b>Tokenizer</b> from the text column.
3. Create a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
4. Create a <b>CountVectorizer</b> after removing the <b>stop words</b>.
5. Create the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [15]:
from pyspark.ml.feature import *

In [16]:
tokenizer = Tokenizer(inputCol='text',outputCol="text_Tok")

In [17]:
remover = StopWordsRemover(inputCol='text_Tok',outputCol="text_RemovedStopWords" )

In [18]:
countvectorizer =CountVectorizer(inputCol='text_RemovedStopWords',outputCol="text_CountVectorizer")

In [19]:
from pyspark.ml.feature import IDF
idf = IDF(inputCol='text_CountVectorizer',outputCol="text_idf")

- Convert the <b>class column</b> to index using <b>StringIndexer</b>
- Create feature column from the <b>TF-IDF</b> and <b>lenght</b> columns.

In [20]:
stringIndexer = StringIndexer(inputCol='class', outputCol='class_index',handleInvalid='skip')

In [21]:
vecAssembler = VectorAssembler(inputCols=['text_idf', 'length'],outputCol='features')

## The Model
- Create a <b>NaiveBayes</b> classifier with the default parameters.

In [22]:
from pyspark.ml.classification import NaiveBayes

In [23]:
import numpy as np
NB=NaiveBayes(featuresCol='features',
                      labelCol='class_index',
                      predictionCol='prediction')

## Pipeline
### Create a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [24]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[tokenizer, remover, countvectorizer ,idf ,stringIndexer ,  vecAssembler , NB])

### Split your data to trian and test data with ratios 0.7 and 0.3 respectively.

In [25]:
trainDF, testDF = df.randomSplit([.7,.3],seed=42)
print(f"There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set")

There are 3981 rows in the training set, and 1593 in the test set


In [26]:
trainDF.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham| &lt;#&gt;  in mc...|    36|
|  ham| &lt;#&gt;  mins ...|    51|
|  ham| and  picking the...|    41|
|  ham| came to look at ...|   103|
|  ham| gonna let me kno...|    95|
|  ham| says that he's q...|   200|
|  ham|"Happy valentines...|   147|
|  ham|"Its Ur luck to L...|   155|
|  ham|"Life is nothing ...|   159|
|  ham|"The world suffer...|   129|
|  ham|"Wen u miss someo...|   143|
|  ham|&lt;#&gt;  am I t...|    45|
|  ham|&lt;#&gt;  is fas...|   461|
|  ham|&lt;#&gt; %of ppl...|   327|
|  ham|'An Amazing Quote...|   141|
|  ham|'Wnevr i wana fal...|   155|
|  ham|(And my man carlo...|    66|
|  ham|(You didn't hear ...|    28|
|  ham|      * Am on my way|    14|
|  ham|* Was really good...|    69|
+-----+--------------------+------+
only showing top 20 rows



In [27]:
testDF.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham| &lt;DECIMAL&gt; ...|   132|
|  ham| said kiss, kiss,...|   133|
|  ham| what number do u...|    36|
|  ham|"Gimme a few" was...|    41|
|  ham|"Response" is one...|   154|
|  ham|"SYMPTOMS" when U...|   139|
|  ham|"Speak only when ...|    80|
|  ham|&lt;#&gt;  great ...|    85|
|  ham|&lt;#&gt;  w jett...|    37|
|  ham|&lt;#&gt; , that'...|    48|
|  ham|&lt;#&gt; ISH MIN...|    45|
|  ham|(I should add tha...|   132|
|  ham|(No promises on w...|    60|
|  ham|(That said can yo...|    43|
|  ham|* Am on a train b...|    56|
|  ham|* Thought I didn'...|    27|
|  ham|* Was a nice day ...|   140|
|  ham|* Will have two m...|    67|
|  ham|, ,  and  picking...|   169|
|  ham|, how's things? J...|    38|
+-----+--------------------+------+
only showing top 20 rows



### Fit your Pipeline model to the training data

In [28]:
pipeline_model = pipeline.fit(trainDF)

### Perform predictions on tests dataframe

In [29]:
pred = pipeline_model.transform(testDF)

### Print the schema of the prediction dataframe

In [30]:
pred.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- text_Tok: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- text_RemovedStopWords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- text_CountVectorizer: vector (nullable = true)
 |-- text_idf: vector (nullable = true)
 |-- class_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- Use <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [31]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [32]:
Multiclass_Evaluator= MulticlassClassificationEvaluator(predictionCol='prediction',
                                         labelCol='class_index',metricName='f1')

In [33]:
print("f1_score is: " , Multiclass_Evaluator.evaluate(pred))

f1_score is:  0.9727502290227267
