In [None]:
!pip install pyspark
!pip install findspark

In [None]:
!unzip /content/smsspamcollection.zip

Archive:  /content/smsspamcollection.zip
replace SMSSpamCollection? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
  inflating: readme                  


### Create a spark session and import the required libraries

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Read the readme file to learn more about the data

### Read the data into a DataFrame

In [None]:
df= spark.read.option("delimiter", "\t").csv('/content/SMSSpamCollection', inferSchema=True)

In [None]:
df.show(5)

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
+----+--------------------+
only showing top 5 rows



### Print the schema

In [None]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



### Rename the first column to 'class' and second column to 'text'

In [None]:
data= df.withColumnRenamed("_c0","class")
data= data.withColumnRenamed("_c1","text")

In [None]:
data.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



### Show the first 10 rows from the dataframe
- Show once with truncate=True and once with truncate=False

In [None]:
data.show(10,truncate=True)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



In [None]:
data.show(10,truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                 |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075o

## Clean and Prepare the Data

### Create a new feature column contains the length of the text column

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as fn

len_text = lambda s: len(s)
len_text= fn.udf(len_text,IntegerType())
data = data.withColumn("len_text",len_text('text'))

### Show the new dataframe

In [None]:
data.show()

+-----+--------------------+--------+
|class|                text|len_text|
+-----+--------------------+--------+
|  ham|Go until jurong p...|     111|
|  ham|Ok lar... Joking ...|      29|
| spam|Free entry in 2 a...|     155|
|  ham|U dun say so earl...|      49|
|  ham|Nah I don't think...|      61|
| spam|FreeMsg Hey there...|     147|
|  ham|Even my brother i...|      77|
|  ham|As per your reque...|     160|
| spam|WINNER!! As a val...|     157|
| spam|Had your mobile 1...|     154|
|  ham|I'm gonna be home...|     109|
| spam|SIX chances to wi...|     136|
| spam|URGENT! You have ...|     155|
|  ham|I've been searchi...|     196|
|  ham|I HAVE A DATE ON ...|      35|
| spam|XXXMobileMovieClu...|     149|
|  ham|Oh k...i'm watchi...|      26|
|  ham|Eh u remember how...|      81|
|  ham|Fine if thats th...|      56|
| spam|England v Macedon...|     155|
+-----+--------------------+--------+
only showing top 20 rows



### Get the average text length for each class (give alias name to the average length column)

In [None]:
data.groupby('class').agg(fn.avg('len_text').alias("Avg. Lenght")).show()

+-----+-----------------+
|class|      Avg. Lenght|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### In this part you transform you raw text in to tf_idf model :
- For more information about TF-IDF check the following link: <b>(Not needed for the test)</b>
https://en.wikipedia.org/wiki/Tf%E2%80%93idf

### Perform the following steps to obtain TF-IDF:
1. Import the required transformers/estimators for the subsequent steps.
2. Create a <b>Tokenizer</b> from the text column.
3. Create a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
4. Create a <b>CountVectorizer</b> after removing the <b>stop words</b>.
5. Create the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,StopWordsRemover,CountVectorizer,HashingTF

tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(data)
stopwords = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="AfterRemovestopwords")
wordsData = stopwords.transform(wordsData)
cv = CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol="vectors")
vectorizer = cv.fit(wordsData)
wordsData= vectorizer.transform(wordsData)
# idf=IDF(inputCol="vectors", outputCol="tf-idf_features")
# idfModel = idf.fit(wordsData)
# wordsData = idfModel.transform(wordsData)

idf = HashingTF(inputCol="AfterRemovestopwords", outputCol="tf-idf_features")
wordsData = idf.transform(wordsData)
wordsData.show()

+-----+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|class|                text|len_text|               words|AfterRemovestopwords|             vectors|     tf-idf_features|
+-----+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|  ham|Go until jurong p...|     111|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(262144,[38555,52...|
|  ham|Ok lar... Joking ...|      29|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(262144,[51783,15...|
| spam|Free entry in 2 a...|     155|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(262144,[9443,122...|
|  ham|U dun say so earl...|      49|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,80,1...|(262144,[2306,332...|
|  ham|Nah I don't think...|      61|[nah, i, don't, t...|[nah, think, goes...|(13423,[36,134,31...|(262144,[25964,64...|
| spam|FreeMsg Hey there

- Convert the <b>class column</b> to index using <b>StringIndexer</b>
- Create feature column from the <b>TF-IDF</b> and <b>lenght</b> columns.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer
from pyspark.ml.feature import VectorAssembler

stringIndexer = StringIndexer(inputCol="class",
                              outputCol="encode_class",
                             handleInvalid='skip')

assemblerInputs =["vectors","len_text"]
vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')


## The Model
- Create a <b>NaiveBayes</b> classifier with the default parameters.

In [None]:
from pyspark.ml.classification import NaiveBayes

model = NaiveBayes(featuresCol='features',
                      labelCol='encode_class',
                      predictionCol='prediction')

## Pipeline
### Create a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[stringIndexer,
                           vecAssembler,model])

### Split your data to trian and test data with ratios 0.7 and 0.3 respectively.

In [None]:
trainDF, testDF = wordsData.randomSplit([.7,.3],seed=42)

### Fit your Pipeline model to the training data

In [None]:
pipelineModel = pipeline.fit(trainDF)

### Perform predictions on tests dataframe

In [None]:
predDF = pipelineModel.transform(testDF)

### Print the schema of the prediction dataframe

In [None]:
predDF.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- len_text: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- AfterRemovestopwords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vectors: vector (nullable = true)
 |-- tf-idf_features: vector (nullable = true)
 |-- encode_class: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- Use <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
Evaluator= MulticlassClassificationEvaluator(predictionCol='prediction',
                                         labelCol='encode_class',
                                         metricName='f1')

In [None]:
Evaluator.evaluate(predDF)

0.9758738064105548

f1_score is: 0.9664707489549014
