## Introduction to Model Development with Spark

This will be the first of three parts of a bootcamp on Model Development with [MLlib](https://spark.apache.org/docs/latest/ml-guide.html), Spark’s machine learning (ML) library.  You will gain hands-on experience with essential steps of a model development using MLlib, which has has the goal to make machine learning scalable and easy. 

At a high level, MLlib provides tools such as:
- ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
- Featurization: feature extraction, transformation, dimensionality reduction, and selection
- Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
- Persistence: saving and load algorithms, models, and Pipelines
- Utilities: linear algebra, statistics, data handling, etc.

In this lab, we will cover:
- Splitting of data for training and testing
- Applying Transformers to data frames
- Fitting Estimators to our data
- Creating and executing a ML Pipeline
- Model Evaluation.

We will solve a common task in Natural Language Processing (NLP): Sentiment Analysis.  Our dataset from the internet movie database (IMDB) contains roughly 50,000 movie reviews.  Each entry is a movie review written in the English language, as well as the author's rating of the movie on a scale from 1 to 10.  Based on the text of the review, we want to predict if the rating is "positive" or "negative".

Other applications of Sentiment Analysis:
- Detecting negative affect in customers who are calling an automated customer hotline
- Agreggating reviews of retail products into an overall rating for each product

## Initialize Environment

As usual we start by setting up our environment and mounting our data.

In [3]:
%run "../includes/mnt_blob"

In [4]:
%run "../includes/setup_env"

In [5]:
reviewsDF = spark.read.parquet("/mnt/data/imdb/imdb_ratings_50k.parquet")
reviewsDF.createOrReplaceTempView("reviews")
display(reviewsDF)

What does the distribution of scores look like?

HINT: Use `count()`

In [7]:
%sql
--TODO: Replace <FILL IN> with appropriate code

SELECT <FILL_IN>

The authors of this dataset have removed the "neutral" ratings, which they defined as a rating of 5 or 6.

## Train-Test Split

We'll split our data into training and test samples. We will use 80% for training, and the remaining 20% for testing. We set a seed to reproduce the same results (i.e. if you re-run this notebook, you'll get the same results both times).

In [10]:
(trainDF, testDF) = reviewsDF.randomSplit([0.8, 0.2], seed=42)
trainDF.cache()
testDF.cache()

Let's determine our baseline accuracy.

In [12]:
positiveRatings = trainDF.filter("rating >= 5").count()
totalRatings = trainDF.count()

print("Baseline accuracy: {0:.2f}%".format(positiveRatings/totalRatings*100))

## Transformers

A transformer takes in a DataFrame, and returns a new DataFrame with one or more columns appended to it. They implement a `.transform()` method.

Let's get started by using [RegexTokenizer](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.RegexTokenizer) to convert our review string into a list of tokens.

In [15]:
from pyspark.ml.feature import RegexTokenizer

tokenizer = (RegexTokenizer()
            .setInputCol("review")
            .setOutputCol("tokens")
            .setPattern("\\W+"))

tokenizedDF = tokenizer.transform(reviewsDF)
display(tokenizedDF.limit(5)) # Look at a few tokenized reviews

There are a lot of words that do not contain much information about the sentiment of the review (e.g. `the`, `a`, etc.). Let's remove these uninformative words using [StopWordsRemover](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.StopWordsRemover).

In [17]:
from pyspark.ml.feature import StopWordsRemover

remover = (StopWordsRemover()
          .setInputCol("tokens")
          .setOutputCol("stopWordFree"))

removedStopWordsDF = remover.transform(tokenizedDF)
display(removedStopWordsDF.limit(5)) # Look at a few tokenized reviews without stop words

Where do the stop words actually come from? Spark includes a small English list as a default, which we're implicitly using here.

In [19]:
stopWords = remover.getStopWords()
stopWords

Let's remove the `br` from our reviews.

In [21]:
remover.setStopWords(["br"] + stopWords)
removedStopWordsDF = remover.transform(tokenizedDF)

## Estimators

Estimators take in a DataFrame, and return a model (a Transformer). They implement a `.fit()` method.

Let's apply a [CountVectorizer](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.CountVectorizer) model to convert our tokens into a vocabulary.

In [24]:
from pyspark.ml.feature import CountVectorizer

counts = (CountVectorizer()
          .setInputCol("stopWordFree")
          .setOutputCol("features")
          .setVocabSize(1000))

countModel = counts.fit(removedStopWordsDF) # It's a model

__Now let's adjust the label (target) values__

We want to group the reviews into "positive" or "negative" sentiment. So all of the star "levels" need to be collapsed into one of two groups. To accomplish this, we will use [Binarizer](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.Binarizer).

In [26]:
from pyspark.ml.feature import Binarizer

binarizer = (Binarizer()
            .setInputCol("rating")
            .setOutputCol("label")
            .setThreshold(5.0))

Now we are going to use a Decision Tree model to fit to our dataset.

In [28]:
from pyspark.ml.classification import DecisionTreeClassifier

dtc = DecisionTreeClassifier()

## Pipeline

Let's put all of these stages into a [Pipeline](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline). This way, you don't have to remember all of the different steps you applied to the training set, and then apply the same steps to the test dataset. The pipeline takes care of that for you!

In [30]:
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages([tokenizer, remover, counts, binarizer, dtc])
pipelineModel = pipeline.fit(trainDF)

We can extract the stages from our Pipeline, such as the Decision Tree model.

In [32]:
decisionTree = pipelineModel.stages[-1]
print(decisionTree.toDebugString)

Let's save the pipeline model.

In [34]:
fileName = userhome + "/tmp/DT_Pipeline"
pipelineModel.write().overwrite().save(fileName)

Now let's load the `PipelineModel` back in.

**Note**: You need to know what type of model you're loading in.

In [36]:
from pyspark.ml import PipelineModel
# Load saved model
savedPipelineModel = PipelineModel.load(fileName)

In [37]:
resultDF = savedPipelineModel.transform(testDF)

## Evaluate

We are going to use [MultiClassClassificationEvaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator)  to evaluate our predictions (we are using MultiClass because the BinaryClassificationEvaluator does not support accuracy as a metric).

In [39]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Accuracy: %(result)s" % {"result": evaluator.evaluate(resultDF)})

#### Confusion Matrix

Let's see if we had more False Positive or False Negatives.

In [41]:
display(resultDF.groupBy("label", "prediction").count())

Later, we will see how to apply this pipeline to streaming data!

-sandbox
&copy; 2018 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>