# Sparkling Water Pipeline Productionalization

## Background

Sparkling Water provides access to H2O algorithms and publishes an API to integrate them as part of regular Spark pipelines. This feature allows for seamless training and deployment of H2O algorithms in the Spark environment. Furthermore, thanks to MOJO (java binary) representations of trained H2O models, production pipelines do not require access to H2O runtime. This enables a wide variety of deployment scenarios. Similarly, Sparkling Water can be used for deploying MOJOs from Driverless AI models.

Moreover, by supporting Python and Scala environments, we enable a simple transfer of modeling results between data scientists ("Python land") and production ("JVM land").


## Goal

The goals of this hands-on are two-fold:
  - Show integration of H2O models into Spark pipelines using PySpark and PySparkling,
  - Demonstrate deployment of the trained pipeline in the context of JVM and Spark streaming.
  
Our modeling goal is to predict sentiment of Amazon food reviews. For this purpose, we use a pre-processed dataset from [SNAP repository](https://snap.stanford.edu/data/web-FineFoods.html). The dataset contains multiple columns but for simplicity, we will use only `date`, `summary` and overall `score`. The score helps us to approximate sentiment.

![Scenario](./img/scenario.png)

## Environment preparation

First, let's verify that `SparkSession` is available in the notebook environment. We do not need to explicitly create a `SparkSession` as it is automatically created for us
during startup of the Jupyter notebook. This works because Jupyter is configured with a Spark kernel.


In [None]:
spark

### Prepare `H2OContext`

We will start `H2OContext` in the so-called _internal backend_ mode. The means H2O is sharing the JVM with Spark, as shown below.

![InternalBackend](./img/internal_backend.png)

The following call initializes H2O on each Spark executor in the Spark cluster.

In [None]:
from pysparkling import *
hc = H2OContext.getOrCreate(spark)

>Note: the reported IP is the private IP of the docker container where the notebook is running. To open H2O Flow in your own browser, copy your browser URL and replace the port with 54321. In Aquarium you would replace `/jupyter` with `/h2o`, since Aquarium uses reverse portmapping to get to the standard ports.
>
>For example, my Jupyter notebook's URL is `http://52.202.98.125/jupyter`. After opening a new browser tab or window, copy the address and replace port `/jupyter` with `/h2o`:
>`http://52.202.98.125/h2o`.

## Spark Steps

We are going to use Spark for all the initial steps, from data ingestion to data cleaning and feature engineering.

>Alternately, one could load the data into H2O 
>
>```
import h2o
reviews_h2o = h2o.upload_file("AmazonReviews_Train.csv", "reviews.hex")
>```
>
>use H2O Flow to investigate the data, and then convert the H2O frame to a Spark frame
>
>```
reviews_spark = hc.as_spark_frame(reviews_h2o)
>```

In [None]:
reviews_spark = spark.read.load("/home/h2o/data/amazon_reviews/AmazonReviews_Train.csv",
                                format="csv", sep=",", inferSchema="true", header="true")

#### Trick #1: Save the original Spark schema

At this point, we will save the input data schema to be used later in the deployed Spark streaming application.

In [None]:
reviews_spark.printSchema()

with open('schema.json','w') as f:
    f.write(str(reviews_spark.schema.json()))

## Now let's define all the stages for the pipeline

The Spark pipelines are composed of various transformers. In our example, we combine a few Spark transformers to clean up textual data and transform it into numerical format. The pipeline is finalized by training an H2O XGBoost binomial model.

> Note: The pipeline stages are not executed right away, they are executed during each fit and transform call.

### Define transformer to drop unnecessary columns
The Spark `SQLTransformer` allows for using SQL to munge data.

As part of this transformer, we convert timestamp to a human readable date string.

For this example, we are selecting just the `Score`, `Time` and `Summary` columns. The goal of this analysis is to predict sentiment, i.e., whether the review is positive or negative. The review can be influenced by several aspects. The `Summary` is of course the mostly important information, but `Time` can influence the model as well. For example, people may tend to give higher reviews on Friday evenings because there's a weekend in front of them. :)

In [None]:
from pyspark.ml.feature import SQLTransformer
colSelect = SQLTransformer(
    statement="SELECT Score, from_unixtime(Time) as Time, Summary FROM __THIS__")

#### Trick #2: Explore intermediate results
To explore intermediate results, we can invoke the defined transformer directly. Note that this will cause Spark to execute the transformer as well as all unevaluated upstream code. 

In [None]:
selected = colSelect.transform(reviews_spark)
selected.show()

### Define transformer to create multiple time features based on the `Time` column

The `Time` column is stored internally as a timestamp. To be useful in modeling, we need to extract the time information in a format that is understandable by the predictive algorithms we employ. We can use SparkSQL data methods such as `month`, `dayofmonth`, etc. to engineer multiple new features from the timestamp information. 

In [None]:
refineTime = SQLTransformer(
    statement="""
    SELECT  Score,
            Summary, 
            dayofmonth(Time) as Day, 
            month(Time) as Month, 
            year(Time) as Year, 
            weekofyear(Time) as WeekNum, 
            date_format(Time, 'EEE') as Weekday, 
            hour(Time) as HourOfDay, 
            IF(date_format(Time, 'EEE')='Sat' OR date_format(Time, 'EEE')='Sun', 1, 0) as Weekend, 
            CASE 
                WHEN month(TIME)=12 OR month(Time)<=2 THEN 'Winter' 
                WHEN month(TIME)>=3 OR month(Time)<=5 THEN 'Spring' 
                WHEN month(TIME)>=6 AND month(Time)<=9 THEN 'Summer' 
                ELSE 'Fall' END as Season 
    FROM __THIS__""")

Now inspect the updated data

In [None]:
refined = refineTime.transform(selected)
refined.show()

### Remove neutral reviews and classify the Scores

We are not interested in the neutral reviews (reviews with the `Score=3`) as they would not add much information to the model. This is a fairly standard approach in NPS (net promoter score) type analyses, and common in particular in sentiment analysis. 

In [None]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, udf
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, IDF, CountVectorizer

filterScore = SQLTransformer(
    statement="""
    SELECT  IF(Score<3,'NEGATIVE', 'POSITIVE') as Sentiment, Summary, Day, Month, Year,
            WeekNum, Weekday, HourOfDay, Weekend, Season 
    FROM __THIS__ WHERE Score !=3 """)

 Inspect the data

In [None]:
filtered = filterScore.transform(refined)
filtered.show()

### Tokenize the message

Here we use Spark's [RegexTokenizer](https://spark.apache.org/docs/2.1.0/ml-features.html#tokenizer) to tokenize the messages.

In [None]:
regexTokenizer = RegexTokenizer(inputCol="Summary",
                                outputCol="tokenized_summary",
                                pattern="[, ]",
                                toLowercase=True)

Inspect the data

In [None]:
tokenized = regexTokenizer.transform(filtered)
tokenized.show()

### Remove unnecessary words

Some words do not bring much information for the resulting model. For this, we use Spark's [StopWordsRemover](https://spark.apache.org/docs/2.1.0/ml-features.html#stopwordsremover) to clean the data.

In [None]:
stopWordsRemover = StopWordsRemover(inputCol=regexTokenizer.getOutputCol(),
                                    outputCol="CleanedSummary",
                                    caseSensitive=False)

Inspect the data

In [None]:
stopWordsRemoved = stopWordsRemover.transform(tokenized)
stopWordsRemoved.select(["Sentiment", "Summary", "CleanedSummary"]).show()

### Hash the words

NLP (natural language processing) for predictive modeling is based on the idea that text can be represented as numeric values. These values are then fed into any algorithm the user chooses. One choice of numeric representation uses [CountVectorizer](https://spark.apache.org/docs/2.1.0/ml-features.html#countvectorizer).

`CountVectorizer` is very similar to the [HashingTF](https://spark.apache.org/docs/2.1.0/ml-features.html#tf-idf) function, except that it preserves the mapping from the index back to the word using an internal vocabulary.

For example, if the word `Dog` is stored in the hash at the index `100`, we can get the word back as `countVectorizerModel.vocabulary[100]`.

#### Trick #3: Set minDF parameter to limit number of words

The `minDF` parameter ensures that only words which occur more the `minDF` times in our case are included. This both speeds the process of modeling and ensures that outliers (infrequent words) do not affect our model that much.

In [None]:
countVectorizer = CountVectorizer(inputCol=stopWordsRemover.getOutputCol(),
                                  outputCol="frequencies",
                                  minDF=100)

#### Trick #4: Manually train the count vectorizer so we can see how it behaves before we execute the pipeline


In [None]:
countVecModel = countVectorizer.fit(stopWordsRemoved)

See the vocabulary:

In [None]:
print("Vocabulary size is " + str(len(countVecModel.vocabulary)))
print(countVecModel.vocabulary[:10])

Inspect the data

In [None]:
vectorized = countVecModel.transform(stopWordsRemoved)
vectorized.select(["Sentiment", "CleanedSummary", "frequencies"]).show()

### Create an Inverse Document Frequency (IDF) model

Here we use Spark's [tf-idf](https://spark.apache.org/docs/2.1.0/ml-features.html#tf-idf) method to model the importance of a term in a document to the given set of data. Please see the [Spark documentation](https://spark.apache.org/docs/2.1.0/ml-features.html#tf-idf) for more information on TF-IDF.

In [None]:
idf = IDF(inputCol=countVectorizer.getOutputCol(),
          outputCol="tf_idf_frequencies",
          minDocFreq=1)

Manually train the IDF model to see the results before we execute the pipeline,

In [None]:
idfModel = idf.fit(vectorized)

Inspect the data

In [None]:
afterIdf = idfModel.transform(vectorized)
afterIdf.select(["Sentiment", "CleanedSummary", "frequencies", "tf_idf_frequencies"]).show()

### Remove Summary Column

Recall from above that predictive algorithms do not understand string values very well. This is why we transformed the text data of the `Summary` column using TF-IDF. We will keep the numeric representations of `Summary` and drop the original text so that we do not confuse the model.

In [None]:
removeSummary = SQLTransformer(
    statement="""
    SELECT Sentiment, Day, Month, Year, WeekNum, Weekday, HourOfDay, Weekend, Season, tf_idf_frequencies
    FROM __THIS__ """)

Inspect the data

In [None]:
removedSummary = removeSummary.transform(afterIdf)
removedSummary.show()

------------
# Now use H2O

### Create an XGBoost model using H2O

Up to this point, all of our data wrangling and feature engineering efforts have used Spark methods exclusively. Now we turn to H2O to train an H2O XGBoost model on the `Sentiment` column (using default settings). Note that there are many more steps involved with tuning an XGBoost model which we omit here. The full documentation for XGBoost is available at [H2O Documentation](http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/xgboost.html)

In [None]:
import h2o
from pysparkling.ml import ColumnPruner, H2OXGBoost

xgboost = H2OXGBoost(splitRatio=0.8,
             featuresCols=[idf.getOutputCol()],
             labelCol="Sentiment")

###  Create the pipeline by defining all the stages

Now we have all the pieces ready and can define the final pipeline.

In [None]:
pipeline = Pipeline(stages=[colSelect,
                            refineTime,
                            filterScore,
                            regexTokenizer,
                            stopWordsRemover,
                            countVectorizer,
                            idf,
                            removeSummary,
                            xgboost])

## Train the pipeline model

The `fit` call calls each trasformer and estimator in the pipeline and creates so called the `PipelineModel`. The model is trained from the cleaned data from previous transformers and the final model is ready to accept the raw data to make predictions

In [None]:
model = pipeline.fit(reviews_spark)

### Try predictions

First, let's load the data that we can use for prediction

In [None]:
reviews_spark_pred = spark.read.load("/home/h2o/data/amazon_reviews/AmazonReviews_Predictions.csv",
                                     format="csv", sep=",", inferSchema="true", header="true")

Now run the predictions:

In [None]:
model.transform(reviews_spark_pred).show()

## Save the pipeline model

Later we can use the pipeline model in Scala to demonstrate the deployment of the pipeline in the JVM world.

In [None]:
model.write().overwrite().save("reviews_pipeline.model")

#### Trick #5: Check variable importances

We can inspect the model in H2O Flow and see the variable importances. However, we do not have information about the words, just the indices. We can ask the `CountVectorizer` what word is on the specific index to see what words affect our model the most.

In [None]:
model.stages[5].vocabulary[0]

## Let's Deploy the Application

Up to this point, we have defined the PySpark pipeline. We will now demonstrate its deployment using the PySpark Streaming application in python, where the pipeline defined above will receive raw streaming data and run predictions on them in real time.

The steps will be:

 1. Load the schema from the schema file.
 1. Load the pipeline from the pipeline file.
 1. Create an input data stream and pass it the schema. The input data stream will point to a directory where a new csv files will be coming from different streaming sources.
 1. Create and output the data stream. For the purposes of this tutorial, we store the data into memory and also into a SparkSQL table.
 1. We can inspect the predictions in "real time" by regularly displaying the content of the desired table.

### 1. Check again that we have spark available

In [None]:
spark

### 2. Load the exported schema of our input data

In [None]:
from pyspark.sql.types import StructType
import json

schema = StructType.fromJson(json.load(open("schema.json", 'r')))
print(schema)

### 3. Load the exported pipeline model

In [None]:
from pyspark.ml import PipelineModel
pipeline_model = PipelineModel.load("reviews_pipeline.model/")

### 4. Start Streaming

In [None]:
from subprocess import Popen
Popen(["./start_streaming.sh"])

In [None]:
!ls output

### 5. Prepare the input data stream

In [None]:
input_data_stream = spark.readStream.schema(schema).csv("output")

### 6. Prepare the output data stream

In [None]:
output_data_stream = pipeline_model.transform(input_data_stream)

### 7. Start processing the input data

In [None]:
output_data_stream.writeStream.format("memory").queryName("predictions").start()

### 8. List the output

In [None]:
import time
# limit to 10 for hands-on
# could replace with 
# while(True):

for x in range(10):
    spark.sql("select * from predictions").show()
    time.sleep(3)

### 9. Shut down the cluster

In [None]:
h2o.cluster().shutdown()
spark.stop()