# Big Data Management
Homework 07 - Detect Spam with PySpark and MLFlow  
Garth Mortensen  
2020.04.03  

## Summary

1. Create a model to predict if a message is spam.
2. Track your model wth MLFlow.

Note: In databricks, push tab to show options. For instance, type df. and push tab.  
Note: In databricks cells, you can use md, sql, python, sh, etc, as shown [here](https://docs.databricks.com/notebooks/notebooks-use.html).

## Background

> MLflow is an open source platform developed by Databricks to help manage the complete Machine Learning lifecycle with enterprise reliability, security, and scale.  

It enables:
1. EXPERIMENT TRACKING. Run experiments with any ML library, framework, or language, and automatically keep track of parameters, metrics, code, and models from each experiment.

2. MODEL MANAGEMENT. Use one place to share models, collaborate on moving them from experimentation to online testing and production, integrate with approval and governance workflows, and monitor ML deployments and their performance.

3. FLEXIBLE DEPLOYMENT. Deploy production models for batch inference on Apache Spark, or as REST APIs using built-in integration with Docker containers, Azure ML, or Amazon SageMaker.

A quickstart guide on MLflow can be found [here](https://docs.databricks.com/applications/mlflow/quick-start.html).

## Dataset

Dataset available [here](https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection).

* Context - The SMS Spam Collection is a set of SMS tagged messages that have been collected for SMS Spam research. It contains one set of SMS messages in English of 5,574 messages, tagged according to being ham (legitimate) or spam.

* Content - The files contain one message per line. Each line is composed of two columns: v1 contains the label (ham or spam) and v2 contains the raw text.

## Spark Execution

### Import libraries

In [3]:
from pyspark.ml import Pipeline
from pyspark.sql.functions import *  # might need these for dataframes

### Read data

Literally the first cell input and I'm already disappointed in the seemingly unnecessary pyspark variation from pandas. 
* Pandas  encoding='latin-1'
* Pyspark encoding='latin1'

For real guys?

In [5]:
# define path and file
dir = "/FileStore/tables/"
file = "spam.csv"  # file was first uploaded into the cluster

# pandas read csv with encoding
# pd.read_csv(path + file, header=0, sep=',', encoding='latin-1')  # latin-1 to overcome UTF-8 issue

# pyspark method 1 for reading csv
# df = spark.read.csv(dir + file, header="true", inferSchema="true")

# pyspark method 2 for reading csv, daiy.chain style
df = spark.read.format('csv').options(header='true', inferSchema='true').option('encoding', 'latin1').load(dir + file)  # latin1 instead of latin-1!

# cache persists the df in memory, instead of disk.
df.cache()

### Check data
Quickly examine that the data is as expected.

In [7]:
df.printSchema()
print("Our dataset has %d rows." % df.count())

In [8]:
# python-pandas techniques
# content = pd.read_csv(path + file, header=0, sep=',', encoding='latin-1')  # latin-1 to overcome UTF-8 issue
# there are extra columns due to seperation on commas. I don't want these
# print(content.columns)
# content = content.drop(labels = ['Unnamed: 2', 'Unnamed: 3', 'Unnamed: 4'], axis = 1)
display(df)

v1,v2,_c2,_c3,_c4
ham,"Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...",,,
ham,Ok lar... Joking wif u oni...,,,
spam,Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's,,,
ham,U dun say so early hor... U c already then say...,,,
ham,"Nah I don't think he goes to usf, he lives around here though",,,
spam,"FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, å£1.50 to rcv",,,
ham,Even my brother is not like to speak with me. They treat me like aids patent.,,,
ham,As per your request 'Melle Melle (Oru Minnaminunginte Nurungu Vettam)' has been set as your callertune for all Callers. Press *9 to copy your friends Callertune,,,
spam,WINNER!! As a valued network customer you have been selected to receivea å£900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.,,,
spam,Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030,,,


Group by label and count occurences.

In [10]:
df.groupBy('v1').count().show()

Check what [percentage](https://stackoverflow.com/a/18251214/5825523) is spam

In [12]:
print("%.0f%%" % (100 * 747/4825))

There is a 85-15 data imbalance.

### Preprocessing
First we replace existing header names with two, more appropriate ones.

Then, we address the extra columns that were included due to bad comma management in the csv. Perhaps the cells should have been surrounded by quotation marks. Either way, the missing data is of little value. It could be kept by using a regex solution, but meh.

Drop columns using one of [several](https://stackoverflow.com/questions/29600673/how-to-delete-columns-in-pyspark-dataframe) techniques.

In [14]:
# column renaming
# python
# df = df.rename(columns={'v1':'label', 'v2':'content'})

# pyspark
df = df.withColumnRenamed('v1','label')
df = df.withColumnRenamed('v2','sentence')

# one way to drop columns
columns_to_drop = ['_c2', '_c3', '_c4']
df = df.drop(*columns_to_drop)

# another way to drop columns
# df.drop('_c2').collect()

Examine our handywork.

In [16]:
display(df)

label,sentence
ham,"Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat..."
ham,Ok lar... Joking wif u oni...
spam,Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's
ham,U dun say so early hor... U c already then say...
ham,"Nah I don't think he goes to usf, he lives around here though"
spam,"FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, å£1.50 to rcv"
ham,Even my brother is not like to speak with me. They treat me like aids patent.
ham,As per your request 'Melle Melle (Oru Minnaminunginte Nurungu Vettam)' has been set as your callertune for all Callers. Press *9 to copy your friends Callertune
spam,WINNER!! As a valued network customer you have been selected to receivea å£900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.
spam,Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030


Great, we're down to 2 properly named and encoded columns.

While we're data cleaning, let's [change](https://stackoverflow.com/a/44773899/5825523) ham and spam to 1 and 0, respectively. This is probably going to be required by the ML algo in the near future, since they usually prefer 1s and 0s.

In [18]:
from pyspark.sql.functions import when

# change ham to 0 and spam to 1
df = df.withColumn('label', \
                   when(df['label'] == 'ham', 0) \
                   .otherwise(1))

In [19]:
display(df)

label,sentence
0,"Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat..."
0,Ok lar... Joking wif u oni...
1,Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's
0,U dun say so early hor... U c already then say...
0,"Nah I don't think he goes to usf, he lives around here though"
1,"FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, å£1.50 to rcv"
0,Even my brother is not like to speak with me. They treat me like aids patent.
0,As per your request 'Melle Melle (Oru Minnaminunginte Nurungu Vettam)' has been set as your callertune for all Callers. Press *9 to copy your friends Callertune
1,WINNER!! As a valued network customer you have been selected to receivea å£900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.
1,Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030


I was intending to str.lower(), remove punctuation, stop words and perhaps stem the words, but I'm spending way too much time trying to apply lambda functions to pyspark dataframe columns. I could use toPandas(), but then it would be kind of a lame cheat.

What I discovered is that the Tokenizer below also takes care of some of this text cleaning for you.

https://www.kaggle.com/redaabdou/sms-spam-solution-data-cleaning-ml

## Split into train-test

Using [this](https://stackoverflow.com/questions/51772908/split-time-series-pyspark-data-frame-into-test-train-without-using-random-spli) method.

In [22]:
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window

df = df.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("sentence")))  # ordering by sentence for now. Could create random column and order by that tho.

train_df = df.where("rank <= 0.7").drop("rank")
train_df.show()

In [23]:
test_df = df.where("rank > 0.7").drop("rank")
test_df.show()

The data is prepped. Our next steps are to:  
1. Create a model to predict if a given text message is spam or not.  
2. Use MLFlow to keep track of your model.  

### [Tokenization](https://spark.apache.org/docs/latest/ml-features)

We want to decompose the sentences into words, which the algorithm can extract more meaningful information from.

In [25]:
#from pyspark.ml.feature import Tokenizer, RegexTokenizer
#from pyspark.sql.functions import col, udf
#from pyspark.sql.types import IntegerType

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

In [27]:
# take from column sentence and produce column words
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# df_tokenized = tokenizer.transform(df)

# our new df with additional column
# df_tokenized.show(n=2, truncate=False, vertical=True)

print("Our dataset has %d rows." % df_tokenized.count())

In [28]:
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(train_df)

In [29]:

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))


Nice. Each sentence has been transformed into an array of strings.

### Remove Stopwords

If the dictionary of words is large, our resultant words to columns matrix becomes wider and sparser. For stats, we (kind of) want taller and thinner datasets, not shorter and wider. We can thin our dataset/matrix out by removing stopwords, which are the most common, low-value words in a language.

In [34]:
from pyspark.ml.feature import StopWordsRemover

# take from column words and produce column filtered
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df_removed = remover.transform(df_tokenized)

# show df
# display(tokenized)  # I'm not sure why this doesn't work
df_removed.show(n=2, truncate=False, vertical=True)

print("Our dataset has %d rows." % df_removed.count())

We have a few columns we don't need anymore, and should be dropped before the next step.

In [36]:
# one way to drop columns
columns_to_drop = ['sentence', 'words']
df_prepped = df_removed.drop(*columns_to_drop)

## Train-Test Split

With the data fully prepped, we can now split the data into our training and testing datasets.

In [38]:
# Split the dataset randomly into 70% for training and 30% for testing.
# train, test = tokenized.randomSplit([0.7, 0.3])
(trainingData, testData) = df_prepped.randomSplit([0.7, 0.3], seed = 42)  # set random seed for reproducability

# print("Training Dataset Count: " + str(trainingData.count()))  # this no longer works, since the dataframe contains a string array
# print("Test Dataset Count: " + str(testData.count()))

### Blocker
Now that my dataframe contains field2 (array of strings), I can't get any train-test split working. Nor can I find a path forward on the internet.  
I was initially held up when I tried to use pandas dataframe operations to clean the text data (remove punctuation, stop words, str.lower()), but none of that was working. Also, Tokenizer does that for me.

I can split the original dataframe of ['labels', 'sentences'], but that's not what I need.

If I were to advance past this step, I would then continue to fit a model to this trainining data, apply it to the test model, score its performance, and track the performance using MLflow.

In [40]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

hashingTF = HashingTF(inputCol=df_prepped.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [41]:
import mlflow.spark
mlflow_experiment_id = 3576682701754284  # this should be updated to my experiment number
# Log MLflow
with mlflow.start_run(experiment_id = mlflow_experiment_id) as run:
  # Log Parameters and metrics
  mlflow.log_param("model", "LogisticRegression")
  # Log metrics
  mlflow.log_metric('rmse',rmse)
  # Log model
  mlflow.spark.log_model(model, "model")