# Sparkling Water Pipeline Productionalization

## Background

Sparkling Water provides access to H2O algorithms and publishes API to integrate them as part of regular Spark pipelines. This feature allows for seamless training and deployment of H2O algorithms in the Spark environment. Furthermore, trained pipelines do not require H2O runtime anymore (thanks to MOJO representation of trained H2O models) which enables variety of deployment scenarios.

Moreover, by supporting Python and Scala environment, we enable a simple transfer of modeling results between data scientists (Python land) and production (JVM land).


## Goal

The goal of this hands-on is to:
  - show integration of H2O models into Spark pipelines using PySpark and PySparkling
  - demonstrate deployment of the trained pipeline in the context of JVM and Spark streaming
  
Our modeling goal is to predict sentiment of Amazon food reviews. For this purpose, we use a pre-processed dataset from [SNAP repository](https://snap.stanford.edu/data/web-FineFoods.html). The dataset contains multiple columns but for simplicity, we will use only date, summary and overall score. The score helps us to approximate sentiment.

![Scenario](./img/scenario.png)

## Environment preparation

First, let's verify that `SparkSession` is available in the notebook environment. We do not need to explicitly create `SparkSession` as it is created for us
automatically during start of the Jupyter notebook. This works because of the Jupyter is set up with a Spark kernel.


In [1]:
spark

### Prepare `H2OContext`

We will start `H2OContext` in the so called _internal backend_ mode. The means H2O is sharing JVM with Spark (see details in [Sparkling Water documentation](https://github.com/h2oai/sparkling-water/blob/rel-2.2/doc/tutorials/backends.rst)).

The following call initializes H2O on each Spark executor in the Spark cluster.

In [2]:
from pysparkling import *
hc = H2OContext.getOrCreate(spark)

Connecting to H2O server at http://10.1.10.237:54321... successful.


0,1
H2O cluster uptime:,07 secs
H2O cluster version:,3.16.0.2
H2O cluster version age:,3 days
H2O cluster name:,sparkling-water-kuba_local-1512344413711
H2O cluster total nodes:,1
H2O cluster free memory:,3.556 Gb
H2O cluster total cores:,8
H2O cluster allowed cores:,8
H2O cluster status:,"accepting new members, healthy"
H2O connection url:,http://10.1.10.237:54321



Sparkling Water Context:
 * H2O name: sparkling-water-kuba_local-1512344413711
 * cluster size: 1
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (driver,10.1.10.237,54321)
  ------------------------

  Open H2O Flow in browser: http://10.1.10.237:54321 (CMD + click in Mac OSX)

    


> Note: the reported IP is a private IP of docker container where the demo is running.


## Data preparation

We are going to use H2O to load data using H2O since it does pretty good job to guess all nuances of input format.

In [3]:
import h2o
reviews_h2o = h2o.upload_file("../data/kuba/AmazonReviews_Train.csv", "reviews.hex")

Parse progress: |█████████████████████████████████████████████████████████| 100%


### Explore data table in H2O flow

At this point, we can explore data directly in this notebook, or we can access H2O Flow and explore data and its properties directly there.


### Convert H2O frame to Spark frame se we can pass it as the input to the pipeline

After data exploration, we can start with data munging. We are going to use Spark, hence we need to publish H2O frame as Spark DataFrame.

In [4]:
reviews_spark = hc.as_spark_frame(reviews_h2o)

#### Trick #1: Save the original Spark schema

At this point we will save the schema of input data and we will reuse it later to configure deployed Spark streaming application.

In [5]:
reviews_spark.printSchema()

with open('schema.json','w') as f:
    f.write(str(reviews_spark.schema.json()))

root
 |-- Id: integer (nullable = false)
 |-- ProductId: string (nullable = false)
 |-- UserId: string (nullable = false)
 |-- ProfileName: string (nullable = false)
 |-- HelpfulnessNumerator: short (nullable = false)
 |-- HelpfulnessDenominator: short (nullable = false)
 |-- Score: byte (nullable = false)
 |-- Time: integer (nullable = false)
 |-- Summary: string (nullable = false)
 |-- Text: string (nullable = false)



## Now let's define all the stages for the pipeline

The Spark pipelines are composed of various transformers. In our example, we combine a few Spark transformers to clean up textual data and transform it into numerical format. The pipeline is finalized by training an H2O GBM binomial model.

> Note: The pipeline stages are not executed right away, they are executed during each fit and transform call.

### Define transformer to drop unnecessary columns
The Spark `SQLTransformer` allows for using SQL to munge data.

As part of this transformer, we convert timestamp to the human readable date string:

We are selecting just `Score`, `Time` and `Summary` columns. The goal of this demo is to predict sentiment, ie, whether the review is positive or negative. The review can be influenced by several aspects. The `Summary` is of course the mostly important information, but `Time` can influence the model as well. For example, people may tend to give higher reviews on Friday evenings because there's a weekend in from of them :)

In [6]:
from pyspark.ml.feature import SQLTransformer
colSelect = SQLTransformer(
    statement="SELECT Score, from_unixtime(Time) as Time, Summary FROM __THIS__")

#### Trick #2: Explore intermediate results
To explore intermediate results, we can also invoke defined transformer directly

In [7]:
selected = colSelect.transform(reviews_spark)
selected.show()

+-----+-------------------+--------------------+
|Score|               Time|             Summary|
+-----+-------------------+--------------------+
|    5|2011-04-26 17:00:00|Good Quality Dog ...|
|    1|2012-09-06 17:00:00|   Not as Advertised|
|    4|2008-08-17 17:00:00|""Delight"" says ...|
|    2|2011-06-12 17:00:00|      Cough Medicine|
|    5|2012-10-20 17:00:00|         Great taffy|
|    4|2012-07-11 17:00:00|          Nice Taffy|
|    5|2012-06-19 17:00:00|Great!  Just as g...|
|    5|2012-05-02 17:00:00|Wonderful, tasty ...|
|    5|2011-11-22 16:00:00|          Yay Barley|
|    5|2012-10-25 17:00:00|    Healthy Dog Food|
|    5|2005-02-07 16:00:00|The Best Hot Sauc...|
|    5|2010-08-26 17:00:00|My cats LOVE this...|
|    1|2012-06-12 17:00:00|My Cats Are Not F...|
|    4|2010-11-04 17:00:00|   fresh and greasy!|
|    5|2010-03-11 16:00:00|Strawberry Twizzl...|
|    5|2009-12-28 16:00:00|Lots of twizzlers...|
|    2|2012-09-19 17:00:00|          poor taste|
|    5|2012-08-15 17

### Create transformer which creates several time columns based on the `Time` colum

The time is stored as a timestamp, however, we would like to get a more human readable information from it. We can use the SparkSQL data methods such as `month`, `dayofmonth` and so on to achieve this.

In [8]:
refineTime = SQLTransformer(
    statement="""
    SELECT  Score,
            Summary, 
            dayofmonth(Time) as Day, 
            month(Time) as Month, year(Time) as Year, 
            weekofyear(Time) as WeekNum, 
            date_format(Time, 'EEE') as WeekDay, 
            hour(Time) as HourOfDay, 
            IF(date_format(Time, 'EEE')='Sat' OR date_format(Time, 'EEE')='Sun', 1, 0) as Weekend, 
            CASE 
                WHEN month(TIME)=12 OR month(Time)<=2 THEN 'Winter' 
                WHEN month(TIME)>=3 OR month(Time)<=5 THEN 'Spring' 
                WHEN month(TIME)>=6 AND month(Time)<=9 THEN 'Summer' 
                ELSE 'Autumn' END as Seasson 
    FROM __THIS__""")

Inspect the data after 

In [9]:
refined = refineTime.transform(selected)
refined.show()

+-----+--------------------+---+-----+----+-------+-------+---------+-------+-------+
|Score|             Summary|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|
+-----+--------------------+---+-----+----+-------+-------+---------+-------+-------+
|    5|Good Quality Dog ...| 26|    4|2011|     17|    Tue|       17|      0| Spring|
|    1|   Not as Advertised|  6|    9|2012|     36|    Thu|       17|      0| Spring|
|    4|""Delight"" says ...| 17|    8|2008|     33|    Sun|       17|      1| Spring|
|    2|      Cough Medicine| 12|    6|2011|     23|    Sun|       17|      1| Spring|
|    5|         Great taffy| 20|   10|2012|     42|    Sat|       17|      1| Spring|
|    4|          Nice Taffy| 11|    7|2012|     28|    Wed|       17|      0| Spring|
|    5|Great!  Just as g...| 19|    6|2012|     25|    Tue|       17|      0| Spring|
|    5|Wonderful, tasty ...|  2|    5|2012|     18|    Wed|       17|      0| Spring|
|    5|          Yay Barley| 22|   11|2011|     47|   

### Remove neutral reviews and classify the Scores

We are not interested in the neutral reviews (reviews with the `Score=3`) as they would not add much information to the model

In [10]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, udf
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, IDF, CountVectorizer

filterScore = SQLTransformer(
    statement="""
    SELECT  IF(Score<3,'NEGATIVE', 'POSITIVE') as Sentiment, Summary, Day, Month, Year,
            WeekNum, WeekDay, HourOfDay, Weekend, Seasson 
    FROM __THIS__ WHERE Score !=3 """)



 Inspect the data

In [11]:
filtered = filterScore.transform(refined)
filtered.show()

+---------+--------------------+---+-----+----+-------+-------+---------+-------+-------+
|Sentiment|             Summary|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|
+---------+--------------------+---+-----+----+-------+-------+---------+-------+-------+
| POSITIVE|Good Quality Dog ...| 26|    4|2011|     17|    Tue|       17|      0| Spring|
| NEGATIVE|   Not as Advertised|  6|    9|2012|     36|    Thu|       17|      0| Spring|
| POSITIVE|""Delight"" says ...| 17|    8|2008|     33|    Sun|       17|      1| Spring|
| NEGATIVE|      Cough Medicine| 12|    6|2011|     23|    Sun|       17|      1| Spring|
| POSITIVE|         Great taffy| 20|   10|2012|     42|    Sat|       17|      1| Spring|
| POSITIVE|          Nice Taffy| 11|    7|2012|     28|    Wed|       17|      0| Spring|
| POSITIVE|Great!  Just as g...| 19|    6|2012|     25|    Tue|       17|      0| Spring|
| POSITIVE|Wonderful, tasty ...|  2|    5|2012|     18|    Wed|       17|      0| Spring|
| POSITIVE

### Tokenize the message

Here we use the [RegexTokenizer](https://spark.apache.org/docs/2.1.0/ml-features.html#tokenizer) to tokenize the messages.

In [12]:
regexTokenizer = RegexTokenizer(inputCol="Summary",
                                outputCol="tokenized_summary",
                                pattern="[, ]",
                                toLowercase=True)

Inspect the data

In [13]:
tokenized = regexTokenizer.transform(filtered)
tokenized.show()

+---------+--------------------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
|Sentiment|             Summary|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|   tokenized_summary|
+---------+--------------------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
| POSITIVE|Good Quality Dog ...| 26|    4|2011|     17|    Tue|       17|      0| Spring|[good, quality, d...|
| NEGATIVE|   Not as Advertised|  6|    9|2012|     36|    Thu|       17|      0| Spring|[not, as, adverti...|
| POSITIVE|""Delight"" says ...| 17|    8|2008|     33|    Sun|       17|      1| Spring|[""delight"", say...|
| NEGATIVE|      Cough Medicine| 12|    6|2011|     23|    Sun|       17|      1| Spring|   [cough, medicine]|
| POSITIVE|         Great taffy| 20|   10|2012|     42|    Sat|       17|      1| Spring|      [great, taffy]|
| POSITIVE|          Nice Taffy| 11|    7|2012|     28|    Wed|       17|      0| Spring|       [nice, taffy]|
|

### Remove unnecessary words

Some words do not bring much information for the resulting model. For this, we use the [StopWordsRemover](https://spark.apache.org/docs/2.1.0/ml-features.html#stopwordsremover) to clean the data.

In [14]:
stopWordsRemover = StopWordsRemover(inputCol=regexTokenizer.getOutputCol(),
                                    outputCol="CleanedSummary",
                                    caseSensitive=False)

Inspect the data

In [15]:
stopWordsRemoved = stopWordsRemover.transform(tokenized)
stopWordsRemoved.select(["Sentiment", "Summary", "CleanedSummary"]).show()

+---------+--------------------+--------------------+
|Sentiment|             Summary|      CleanedSummary|
+---------+--------------------+--------------------+
| POSITIVE|Good Quality Dog ...|[good, quality, d...|
| NEGATIVE|   Not as Advertised|        [advertised]|
| POSITIVE|""Delight"" says ...| [""delight"", says]|
| NEGATIVE|      Cough Medicine|   [cough, medicine]|
| POSITIVE|         Great taffy|      [great, taffy]|
| POSITIVE|          Nice Taffy|       [nice, taffy]|
| POSITIVE|Great!  Just as g...|[great!, good, ex...|
| POSITIVE|Wonderful, tasty ...|[wonderful, tasty...|
| POSITIVE|          Yay Barley|       [yay, barley]|
| POSITIVE|    Healthy Dog Food|[healthy, dog, food]|
| POSITIVE|The Best Hot Sauc...|[best, hot, sauce...|
| POSITIVE|My cats LOVE this...|[cats, love, ""di...|
| NEGATIVE|My Cats Are Not F...|[cats, fans, new,...|
| POSITIVE|   fresh and greasy!|    [fresh, greasy!]|
| POSITIVE|Strawberry Twizzl...|[strawberry, twiz...|
| POSITIVE|Lots of twizzlers

### Hash the words

The algorithms can efficiently work with the numeric values hence we create a numeric representation of them using [CountVectorizer](https://spark.apache.org/docs/2.1.0/ml-features.html#countvectorizer).

CountVectorizer is very similar to [HashingTF](https://spark.apache.org/docs/2.1.0/ml-features.html#tf-idf) function, except that it preserves the mapping from the index back to the word using internal vocabulary.

For example, if word `Dog` is stored in the hash at the index `100`, we can get the word back as `countVectorizerModel.vocabulary[100]`

#### Trick #3: Set minDF parameter to limit number of words

The minDF parameter ensures that only words which occur more the 100 times in our case are included. This also speeds the process of modelling and ensures that outliers does not affect our model that much.


In [16]:
countVectorizer = CountVectorizer(inputCol=stopWordsRemover.getOutputCol(),
                                  outputCol="frequencies",
                                  minDF=100)

#### Trick #4: Manually train the count vectorizer so we can see how it behaves before we execute the pipeline


In [17]:
countVecModel = countVectorizer.fit(stopWordsRemoved)

See the vocabulary

In [18]:
print("Vocabulary size is " + str(len(countVecModel.vocabulary)))

print(countVecModel.vocabulary[:10])

Vocabulary size is 1523
[u'great', u'good', u'best', u'love', u'coffee', u'tea', u'product', u'taste', u'delicious', u'excellent']


Inspect the data

In [19]:
vectorized = countVecModel.transform(stopWordsRemoved)
vectorized.select(["Sentiment", "CleanedSummary", "frequencies"]).show()

+---------+--------------------+--------------------+
|Sentiment|      CleanedSummary|         frequencies|
+---------+--------------------+--------------------+
| POSITIVE|[good, quality, d...|(1523,[1,10,12,35...|
| NEGATIVE|        [advertised]|  (1523,[621],[1.0])|
| POSITIVE| [""delight"", says]|  (1523,[401],[1.0])|
| NEGATIVE|   [cough, medicine]|        (1523,[],[])|
| POSITIVE|      [great, taffy]|(1523,[0,1435],[1...|
| POSITIVE|       [nice, taffy]|(1523,[30,1435],[...|
| POSITIVE|[great!, good, ex...|(1523,[1,59,126],...|
| POSITIVE|[wonderful, tasty...|(1523,[15,37,1435...|
| POSITIVE|       [yay, barley]|        (1523,[],[])|
| POSITIVE|[healthy, dog, food]|(1523,[10,12,21],...|
| POSITIVE|[best, hot, sauce...|(1523,[2,44,86,45...|
| POSITIVE|[cats, love, ""di...|(1523,[3,12,23,42...|
| NEGATIVE|[cats, fans, new,...|(1523,[12,42,79],...|
| POSITIVE|    [fresh, greasy!]|   (1523,[83],[1.0])|
| POSITIVE|[strawberry, twiz...|(1523,[13,19,667]...|
| POSITIVE|[lots, twizzlers,

### Create inverse document frequencies model

Here we use [tf-idf](https://spark.apache.org/docs/2.1.0/ml-features.html#tf-idf) method to reflect the importance of a term to a document in the given set of data. Please see the Spark documentation for more information at [tf-idf](https://spark.apache.org/docs/2.1.0/ml-features.html#tf-idf).

In [20]:
idf = IDF(inputCol=countVectorizer.getOutputCol(),
          outputCol="tf_idf_frequencies",
          minDocFreq=1)

Manually train the IDF model to see the results before we execute the pipeline

In [21]:
idfModel = idf.fit(vectorized)

Inspect the data

In [22]:
afterIdf = idfModel.transform(vectorized)
afterIdf.select(["Sentiment", "CleanedSummary", "frequencies", "tf_idf_frequencies"]).show()

+---------+--------------------+--------------------+--------------------+
|Sentiment|      CleanedSummary|         frequencies|  tf_idf_frequencies|
+---------+--------------------+--------------------+--------------------+
| POSITIVE|[good, quality, d...|(1523,[1,10,12,35...|(1523,[1,10,12,35...|
| NEGATIVE|        [advertised]|  (1523,[621],[1.0])|(1523,[621],[7.32...|
| POSITIVE| [""delight"", says]|  (1523,[401],[1.0])|(1523,[401],[6.86...|
| NEGATIVE|   [cough, medicine]|        (1523,[],[])|        (1523,[],[])|
| POSITIVE|      [great, taffy]|(1523,[0,1435],[1...|(1523,[0,1435],[2...|
| POSITIVE|       [nice, taffy]|(1523,[30,1435],[...|(1523,[30,1435],[...|
| POSITIVE|[great!, good, ex...|(1523,[1,59,126],...|(1523,[1,59,126],...|
| POSITIVE|[wonderful, tasty...|(1523,[15,37,1435...|(1523,[15,37,1435...|
| POSITIVE|       [yay, barley]|        (1523,[],[])|        (1523,[],[])|
| POSITIVE|[healthy, dog, food]|(1523,[10,12,21],...|(1523,[10,12,21],...|
| POSITIVE|[best, hot, sa

### Remove Summary Column

The algoritms do not understand the string values very well. This is also the reason why we transformed the data using TF-IDF and created a numeric values out of the `Summary` column. Now we can drop the original string information so we do not confuse the model.


In [23]:
removeSummary = SQLTransformer(
    statement="""
    SELECT Sentiment, Day, Month, Year, WeekNum, WeekDay, HourOfDay, Weekend, Seasson, tf_idf_frequencies
    FROM __THIS__ """)

Inspect the data

In [24]:
removedSummary = removeSummary.transform(afterIdf)
removedSummary.show()

+---------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
|Sentiment|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|  tf_idf_frequencies|
+---------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
| POSITIVE| 26|    4|2011|     17|    Tue|       17|      0| Spring|(1523,[1,10,12,35...|
| NEGATIVE|  6|    9|2012|     36|    Thu|       17|      0| Spring|(1523,[621],[7.32...|
| POSITIVE| 17|    8|2008|     33|    Sun|       17|      1| Spring|(1523,[401],[6.86...|
| NEGATIVE| 12|    6|2011|     23|    Sun|       17|      1| Spring|        (1523,[],[])|
| POSITIVE| 20|   10|2012|     42|    Sat|       17|      1| Spring|(1523,[0,1435],[2...|
| POSITIVE| 11|    7|2012|     28|    Wed|       17|      0| Spring|(1523,[30,1435],[...|
| POSITIVE| 19|    6|2012|     25|    Tue|       17|      0| Spring|(1523,[1,59,126],...|
| POSITIVE|  2|    5|2012|     18|    Wed|       17|      0| Spring|(1523,[15,37,1435...|
| POSITIVE

### Create GBM model

Here we are using H2O's estimator to train a H2O GBM model on `Sentiment` column with 50 trees (default). The full documentation for GBM is available at [H2O Documentation](http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/gbm.html)

In [25]:
from pysparkling.ml import ColumnPruner, H2OGBM

gbm = H2OGBM(ratio=0.8,
             featuresCols=[idf.getOutputCol()],
             predictionCol="Sentiment")

###  Create the pipeline by defining all the stages

Now we have all the pieces ready and can define the final pipeline.

In [None]:
pipeline = Pipeline(stages=[colSelect,
                            refineTime,
                            filterScore,
                            regexTokenizer,
                            stopWordsRemover,
                            countVectorizer,
                            idf,
                            removeSummary,
                            gbm])

## Train the pipeline model

The `fit` call calls each trasformer and estimator in the pipeline and creates so called the `PipelineModel`. The model is trained from the cleaned data from previous transformers and the final model is ready to accept the raw data to make predictions

In [None]:
model = pipeline.fit(reviews_spark)

### Try predictions

First, lets load data we use for the predictions

In [None]:
reviews_h2o_pred = h2o.upload_file("../data/kuba/AmazonReviews_Predictions.csv", "reviews_preds.hex")

And convert them to Spark so we can run the Spark pipeline on it

In [None]:
reviews_spark_pred = hc.as_spark_frame(reviews_h2o_pred)

And run the predictions

In [None]:

model.transform(reviews_spark_pred).show()

## Save the pipeline model

Later we use the pipeline model in the Scala to demonstrate the deployment of the pipeline in the JVM world

In [None]:
model.write().overwrite().save("reviews_pipeline.model")

#### Trick #5: Check variable inportances

We can inspect the model in Flow and see the variable importances. However we do not have information about the words, just the indices. We can ask the `CountVectorizer` what word is on the specific index to see what words affect our model the most.

In [None]:
model.stages[5].vocabulary[0]

### Let's deploy the application

Right now, we defined the PySPark pipeline. We will now demonstrate its deployment using Spark Streaming application in Scala where the pipeline defined above will receive raw streaming data and run preditions on them right away.

The steps will be:

 - Load the schema from the schema file
 - Create input data stream and pass it the schema. The input data stream will point to a directory where a new csv files will be comming from different streaming sources
 - Load the pipeline from the pipeline file
 - Create output data stream. For our purposes, we store the data into memory and also to a SparkSQL table
 - We can inspect the predictions  by regularly displaying the content of the desired table

The steps above are written using Scala as:
```
 val spark = SparkSession.builder().master("local").getOrCreate()

      //
      // Load exported pipeline
      //
      import org.apache.spark.sql.types.DataType
      val pipelineModel = PipelineModel.read.load("py/examples/pipelines/reviews_pipeline.model/")

      //
      // Load exported schema of input data
      //
      val schema = StructType(DataType.fromJson(scala.io.Source.fromFile("py/examples/pipeline/schema.json").mkString).asInstanceOf[StructType].map {
        case StructField(name, dtype, nullable, metadata) => StructField(name, dtype, true, metadata)
        case rec => rec
      })
      println(schema)

      //
      // Define input stream
      //
      val inputDataStream = spark.readStream.schema(schema).csv("py/examples/pipeline/data/kuba/input/*.csv")

      //
      // Apply loaded model
      //
      val outputDataStream = pipelineModel.transform(inputDataStream)

      //
      // Forward output stream into memory-sink
      //
      outputDataStream.writeStream.format("memory").queryName("predictions").start()

      //
      // Query results
      //
      while(true){
        spark.sql("select * from predictions").show()
        Thread.sleep(5000)
      }
```

### Let's see it in practice!