#### Prepared By: Gaurav Nepal
# NLP Code Along questions

For this code along we will build a spam filter!

We'll use a classic dataset for this - UCI Repository SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection

#### load and  read the dataset,  have Spark infer the data types

Importing all the pyspark functions that needed for this program:

In [377]:
from pyspark.sql import SparkSession
# using split from pyspark.sql.functions to split each 'sentence' in the DataFrame by its spaces:
from pyspark.sql.functions import split 
#importing the Pyspark SQL length function to find the number of characters in each word:
from pyspark.sql.functions import length 

### Creating a DataFrame

Strart the Spark Session with app Name "spamfilter"

In [378]:
spark = SparkSession.builder.appName('spamfilter').getOrCreate()

##### Read data  "SMSSpamCollection"

In [379]:
df = spark.read.text('SMSSpamCollection') #reading text file using spark.read,text
df.show(10)

+--------------------+
|               value|
+--------------------+
|ham	Go until juro...|
|ham	Ok lar... Jok...|
|spam	Free entry i...|
|ham	U dun say so ...|
|ham	Nah I don't t...|
|spam	FreeMsg Hey ...|
|ham	Even my broth...|
|ham	As per your r...|
|spam	WINNER!! As ...|
|spam	Had your mob...|
+--------------------+
only showing top 10 rows



In [380]:
# lets check it's Schema 
df.printSchema()

root
 |-- value: string (nullable = true)



#### Spliting Data : 
As we can see above, there is only one Column as value and all the stored on single column. Lets split or convert the single column into columns.

In [381]:
new_df = df.select(split(df.value, "\t")) #split each 'sentence' in the DataFrame by its tab, in order to make 2 columns later
new_df.show(10) 

+--------------------+
|     split(value, 	)|
+--------------------+
|[ham, Go until ju...|
|[ham, Ok lar... J...|
|[spam, Free entry...|
|[ham, U dun say s...|
|[ham, Nah I don't...|
|[spam, FreeMsg He...|
|[ham, Even my bro...|
|[ham, As per your...|
|[spam, WINNER!! A...|
|[spam, Had your m...|
+--------------------+
only showing top 10 rows



Lets split or convert the single column into columns and name them class and text, 

In [382]:
#let's create two columns 'class' and 'text' after spliting by tab '\t', and store it in variable 'data' as it's mentioned below 
data = df.select(split(df.value, "\t"))\
    .rdd.flatMap(
            lambda x:x
        ).toDF(schema=["class", "text"]
    )

data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [383]:
# Lets check it's Schema
data.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



Schema is well set now

## Clean and Prepare the Data

#### Create a new length feature

length function to find the number of characters in each word

In [384]:
#withColumn() funtion to create a new column names 'length'
data = data.withColumn('length', length('text'))

In [396]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



#### print the groupy mean of class

In [397]:
# groupBy class and calculating it's mean value based on length
data.groupBy('class').agg({'length':'mean'}).show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.47192873420344|
| spam|138.6760374832664|
+-----+-----------------+



## Feature Transformations

In this part you transform you raw text in to tf_idf model :

- chain the transformer Tokenizer, StopWordsRemover, CountVectorizer and IDF for text to have a final column name 'tf_idf'
- use the transformer StringIndexer for class column into output column 'label'

- create feature with vector assembler 'tf_idf','length of as input columns into output column named 'features'

### use pipeline for fit and transform

Example: it may differ for you

In [398]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StringIndexer,CountVectorizer, StopWordsRemover, IDF, VectorAssembler, HashingTF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes

References : https://spark.apache.org/docs/2.2.0/mllib-naive-bayes.html
           : http://spark.apache.org/docs/latest/ml-pipeline.html
           : http://spark.apache.org/docs/latest/ml-features

In [435]:
tokenizer = Tokenizer(inputCol="text", outputCol="token_txt")
stop_remover = StopWordsRemover(inputCol='token_txt', outputCol='stop_token')
count_vector = CountVectorizer(inputCol='stop_token', outputCol='count_vector') # fit a CountVectorizerModel
idf = IDF(inputCol='count_vector', outputCol='output_idf')
ham_spam_n = StringIndexer(inputCol='class', outputCol='label')

In [436]:
nb = NaiveBayes()

In [419]:
data_pipeline = Pipeline(stages=[ham_spam_n, tokenizer, stop_remover, count_vector, idf, cleaning_data])
cleaner = data_pipeline.fit(data)
clean_data = cleaner.transform(data)

In [428]:
cleaning_data = VectorAssembler(inputCols=['output_idf', 'length'], outputCol='features')

In [429]:
clean_data = clean_data.select(['label', 'features'])

In [430]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13465,[7,11,31,6...|
|  0.0|(13465,[0,24,296,...|
|  1.0|(13465,[2,13,19,3...|
|  0.0|(13465,[0,69,80,1...|
|  0.0|(13465,[36,134,31...|
|  1.0|(13465,[10,67,139...|
|  0.0|(13465,[10,53,103...|
|  0.0|(13465,[125,184,4...|
|  1.0|(13465,[1,46,118,...|
|  1.0|(13465,[0,1,13,27...|
|  0.0|(13465,[18,43,120...|
|  1.0|(13465,[8,17,37,8...|
|  1.0|(13465,[13,30,46,...|
|  0.0|(13465,[39,95,217...|
|  0.0|(13465,[552,1690,...|
|  1.0|(13465,[30,109,11...|
|  0.0|(13465,[82,214,37...|
|  0.0|(13465,[0,2,49,13...|
|  0.0|(13465,[0,74,105,...|
|  1.0|(13465,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



### Detect Spam or Ham

now use your tf-idf data to classify spam and ham

feel free to use any classifier model

result may differ for you

In [431]:
train, test = clean_data.randomSplit([0.7, 0.3])
spam_detector = nb.fit(train)

In [437]:
spam_detector = nb.fit(train) #NaiveBayes().fit

In [433]:
test_results = spam_detector.transform(test)
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13465,[0,1,2,7,8...|[-795.15568812323...|[1.0,1.9184131729...|       0.0|
|  0.0|(13465,[0,1,2,13,...|[-604.84330271663...|[1.0,7.0434331863...|       0.0|
|  0.0|(13465,[0,1,2,41,...|[-1061.9787139080...|[1.0,9.3946796811...|       0.0|
|  0.0|(13465,[0,1,4,50,...|[-831.42583548862...|[1.0,8.8384745461...|       0.0|
|  0.0|(13465,[0,1,5,20,...|[-809.69236939559...|[1.0,4.0528242366...|       0.0|
|  0.0|(13465,[0,1,7,8,1...|[-1157.9825435245...|[1.0,4.4329686255...|       0.0|
|  0.0|(13465,[0,1,12,33...|[-454.26028647698...|[1.0,6.2134460627...|       0.0|
|  0.0|(13465,[0,1,14,78...|[-693.49068946716...|[1.0,1.4521604079...|       0.0|
|  0.0|(13465,[0,1,43,68...|[-616.39946566409...|[0.99171300709555...|       0.0|
|  0.0|(13465,[0

### Calculate the accuracy of your model

In [434]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator()
accuracy  = evaluator.evaluate(test_results)
print("Test Accuracy: ")
print(accuracy)

Test Accuracy: 
0.9293625486190675


0.9293625486190675 Which is not bad.

In [376]:
# spark.stop()