# Sparkling Water Pipeline Productionalization

## Background

Sparkling Water provides access to H2O algorithms and publishes API to integrate them as part of regular Spark pipelines. This feature allow for seamless training and deployment of H2O algorithms in the Spark environment. Futhermore, trained pipelines do not require H2O runtime anymore (thanks to MOJO representation of trained H2O models) which enables variety of deployment scenarios.

Moreover, by supporting Python and Scala environment we enable a simple transfer of modeling results between data scientists (Python land) and production (JVM land).


## Goal

The goal of this hands-on is to:
  - show integration of H2O models into Spark pipelines using PySpark and PySparkling
  - demonstrate deployment of the trained pipeline in the context of JVM and Spark streaming
  
Our modeling goal is to predict sentiment of Amazon food reviews. For this purpose, we use a pre-processed dataset from [SNAP repository](https://snap.stanford.edu/data/web-FineFoods.html). The dataset contains multiple column but for simplicity, we will use only date, summary and overall score. The score helps us to aproximate sentiment.

![Scenario](./img/scenario.png)

## Environment preparation

First, let's verify that `SparkSession` is available in the notebook environment. We do not need to explicitly create SparkSession as it is created for us
automatically during start of the Jupyter notebook. This works because of the Jupyter is set up with Spark kernel.


In [8]:
spark

### Prepare `H2OContext`

We will start `H2OContext` in the so called _internal backend_ mode. The means H2O is sharing JVM with Spark (see details in [Sparkling Water documentation](https://github.com/h2oai/sparkling-water/blob/rel-2.2/doc/tutorials/backends.rst)).

The following call initializes H2O on each Spark executors in the Spark cluster.

In [9]:
from pysparkling import *
hc = H2OContext.getOrCreate(spark)

Connecting to H2O server at http://172.16.2.43:54323... successful.


0,1
H2O cluster uptime:,6 mins 48 secs
H2O cluster version:,3.16.0.2
H2O cluster version age:,1 day
H2O cluster name:,sparkling-water-kuba_local-1512176192407
H2O cluster total nodes:,1
H2O cluster free memory:,6.630 Gb
H2O cluster total cores:,8
H2O cluster allowed cores:,8
H2O cluster status:,"locked, healthy"
H2O connection url:,http://172.16.2.43:54323



Sparkling Water Context:
 * H2O name: sparkling-water-kuba_local-1512176192407
 * cluster size: 1
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (driver,172.16.2.43,54323)
  ------------------------

  Open H2O Flow in browser: http://172.16.2.43:54323 (CMD + click in Mac OSX)

    


> Note: the reported IP is private IP of docker container where the demo is running.


## Data preparation

We are going to use H2O to load data using H2O since it does pretty good job to guess all nuances of input format.

In [10]:
import h2o
reviews_h2o = h2o.upload_file("../data/kuba/AmazonReviews.csv", "reviews.hex")

Parse progress: |█████████████████████████████████████████████████████████| 100%


Id,ProductId,UserId,ProfileName,HelpfulnessNumerator,HelpfulnessDenominator,Score,Time,Summary,Text
1,B001E4KFG0,A3SGXH7AUHU8GW,delmartian,1,1,5,1303860000.0,Good Quality Dog Food,I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than most.
2,B00813GRG4,A1D87F6ZCVE5NK,dll pa,0,0,1,1346980000.0,Not as Advertised,"Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted. Not sure if this was an error or if the vendor intended to represent the product as """"Jumbo"""
3,B000LQOCH0,ABXLMWJIXXAIN,"Natalia Corres """"Natalia Corres",1,1,4,1219020000.0,"""""Delight"""" says it a","This is a confection that has been around a few centuries. It is a light, pillowy citrus gelatin with nuts - in this case Filberts. And it is cut into tiny squares and then liberally coated with powdered sugar. And it is a tiny mouthful of heaven. Not too chewy, and very flavorful. I highly recommend this yummy treat. If you are familiar with the story of C.S. Lewis' """"The Lion, The Witch, and The Wardrobe"""" - this is the treat that seduces Edmund into selling out his Brother and Sisters to the Witc"
4,B000UA0QIQ,A395BORC6FGVXV,Karl,3,3,2,1307920000.0,Cough Medicine,If you are looking for the secret ingredient in Robitussin I believe I have found it. I got this in addition to the Root Beer Extract I ordered (which was good) and made some cherry soda. The flavor is very medicinal.
5,B006K2ZZ7K,A1UQRSCLF8GW1T,"Michael D. Bigham """"M. Wassir",0,0,5,1350780000.0,Great taffy,"Great taffy at a great price. There was a wide assortment of yummy taffy. Delivery was very quick. If your a taffy lover, this is a deal."
6,B006K2ZZ7K,ADT0SRK1MGOEU,Twoapennything,0,0,4,1342050000.0,Nice Taffy,"I got a wild hair for taffy and ordered this five pound bag. The taffy was all very enjoyable with many flavors: watermelon, root beer, melon, peppermint, grape, etc. My only complaint is there was a bit too much red/black licorice-flavored pieces (just not my particular favorites). Between me, my kids, and my husband, this lasted only two weeks! I would recommend this brand of taffy -- it was a delightful treat."
7,B006K2ZZ7K,A1SP2KVKFXXRU1,David C. Sullivan,0,0,5,1340150000.0,Great! Just as good as the expensive brands!,"This saltwater taffy had great flavors and was very soft and chewy. Each candy was individually wrapped well. None of the candies were stuck together, which did happen in the expensive version, Fralinger's. Would highly recommend this candy! I served it at a beach-themed party and everyone loved it!"
8,B006K2ZZ7K,A3JRGQVEQN31IQ,Pamela G. Williams,0,0,5,1336000000.0,"Wonderful, tasty taffy",This taffy is so good. It is very soft and chewy. The flavors are amazing. I would definitely recommend you buying it. Very satisfying!!
9,B000E7L2R4,A1MZYO9TZK0BBI,R. James,1,1,5,1322010000.0,Yay Barley,Right now I'm mostly just sprouting this so my cats can eat the grass. They love it. I rotate it around with Wheatgrass and Rye too
10,B00171APVA,A21BT40VZCCYT4,Carol A. Reed,0,0,5,1351210000.0,Healthy Dog Food,This is a very healthy dog food. Good for their digestion. Also good for small puppies. My dog eats her required amount at every feeding.




### Explore data table in H2O flow

At this point, we can explore data directly in this notebook, or we can access H2O Flow and explore data and its properties directly there.


### Convert to Spark frame for the input to the pipeline

After data exploration, we can start with data munging. We are going to use Spark, hence we need to publish H2O frame as Spark DataFrame.

In [15]:
reviews_spark = hc.as_spark_frame(reviews_h2o)

#### Trick #1: Save the original Spark schema

At this point we will save the schema of input data and we will reuse it later to configure deployed Spark streaming application.

In [4]:
reviews_spark.printSchema()

with open('schema.json','w') as f:
    f.write(str(reviews_spark.schema.json()))

AttributeError: 'NoneType' object has no attribute 'schema'

## Now let's define all the stages for the pipeline

The Spark pipelines are composed of various transformers. In our example we combine a few Spark transformers to clean up textual data and transform it into numerical format. The pipeline is finalized by training an H2O GBM binomial model.

> Note: The pipeline stages are not executed right away, the are executed during each fit and transform call.

### Define transformer to drop unnecessary columns
The Spark `SQLTransformer` allows for using SQL to munge data.

As part of this transformer, we convert timestamp to the human readable date string:

**TODO you need to explain why are you selecting only the subset of columns**

In [18]:
from pyspark.ml.feature import SQLTransformer
colSelect = SQLTransformer(
    statement="SELECT Score, from_unixtime(Time) as Time, Summary FROM __THIS__")

#### Trick #2: Explore intermediate results
To explore intermidiate results,  we can also invoke defined transformer directly

In [19]:
selected = colSelect.transform(reviews_spark)
selected.show()

+-----+-------------------+--------------------+
|Score|               Time|             Summary|
+-----+-------------------+--------------------+
|    5|2011-04-26 17:00:00|Good Quality Dog ...|
|    1|2012-09-06 17:00:00|   Not as Advertised|
|    4|2008-08-17 17:00:00|""Delight"" says ...|
|    2|2011-06-12 17:00:00|      Cough Medicine|
|    5|2012-10-20 17:00:00|         Great taffy|
|    4|2012-07-11 17:00:00|          Nice Taffy|
|    5|2012-06-19 17:00:00|Great!  Just as g...|
|    5|2012-05-02 17:00:00|Wonderful, tasty ...|
|    5|2011-11-22 16:00:00|          Yay Barley|
|    5|2012-10-25 17:00:00|    Healthy Dog Food|
|    5|2005-02-07 16:00:00|The Best Hot Sauc...|
|    5|2010-08-26 17:00:00|My cats LOVE this...|
|    1|2012-06-12 17:00:00|My Cats Are Not F...|
|    4|2010-11-04 17:00:00|   fresh and greasy!|
|    5|2010-03-11 16:00:00|Strawberry Twizzl...|
|    5|2009-12-28 16:00:00|Lots of twizzlers...|
|    2|2012-09-19 17:00:00|          poor taste|
|    5|2012-08-15 17

### Create transformer which creates several time columns based on the Time colums

In [21]:
refineTime = SQLTransformer(
    statement="""
    SELECT  Score,
            Summary, 
            dayofmonth(Time) as Day, 
            month(Time) as Month, year(Time) as Year, 
            weekofyear(Time) as WeekNum, 
            date_format(Time, 'EEE') as WeekDay, 
            hour(Time) as HourOfDay, 
            IF(date_format(Time, 'EEE')='Sat' OR date_format(Time, 'EEE')='Sun', 1, 0) as Weekend, 
            CASE 
                WHEN month(TIME)=12 OR month(Time)<=2 THEN 'Winter' 
                WHEN month(TIME)>=3 OR month(Time)<=5 THEN 'Spring' 
                WHEN month(TIME)>=6 AND month(Time)<=9 THEN 'Summer' 
                ELSE 'Autumn' END as Seasson 
    FROM __THIS__""")

Inspect the data after 

In [6]:
refined = refineTime.transform(selected)
refined.show()

NameError: name 'refineTime' is not defined

### Remove neutral reviews and classify the Scores



In [23]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, udf
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, IDF, CountVectorizer

filterScore = SQLTransformer(
    statement="""
    SELECT  IF(Score<3,'NEGATIVE', 'POSITIVE') as Sentiment, Summary, Day, Month, Year,
            WeekNum, WeekDay, HourOfDay, Weekend, Seasson 
    FROM __THIS__ WHERE Score !=3 """)



 Inspect the data

In [8]:
filtered = filterScore.transform(refined)
filtered.show()

NameError: name 'filterScore' is not defined

### Tokenize the message

In [25]:
regexTokenizer = RegexTokenizer(inputCol="Summary",
                                outputCol="tokenized_summary",
                                pattern="[, ]",
                                toLowercase=True)

Inspect the data

In [26]:
tokenized = regexTokenizer.transform(filtered)
tokenized.show()

+---------+--------------------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
|Sentiment|             Summary|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|   tokenized_summary|
+---------+--------------------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
| POSITIVE|Good Quality Dog ...| 26|    4|2011|     17|    Tue|       17|      0| Spring|[good, quality, d...|
| NEGATIVE|   Not as Advertised|  6|    9|2012|     36|    Thu|       17|      0| Spring|[not, as, adverti...|
| POSITIVE|""Delight"" says ...| 17|    8|2008|     33|    Sun|       17|      1| Spring|[""delight"", say...|
| NEGATIVE|      Cough Medicine| 12|    6|2011|     23|    Sun|       17|      1| Spring|   [cough, medicine]|
| POSITIVE|         Great taffy| 20|   10|2012|     42|    Sat|       17|      1| Spring|      [great, taffy]|
| POSITIVE|          Nice Taffy| 11|    7|2012|     28|    Wed|       17|      0| Spring|       [nice, taffy]|
|

### Remove unnecessary words

In [28]:
stopWordsRemover = StopWordsRemover(inputCol=regexTokenizer.getOutputCol(),
                                    outputCol="CleanedSummary",
                                    caseSensitive=False)

Inspect the data

In [33]:
stopWordsRemoved = stopWordsRemover.transform(tokenized)
stopWordsRemoved.select(["Sentiment", "Summary", "CleanedSummary"]).show()

+---------+--------------------+--------------------+
|Sentiment|             Summary|      CleanedSummary|
+---------+--------------------+--------------------+
| POSITIVE|Good Quality Dog ...|[good, quality, d...|
| NEGATIVE|   Not as Advertised|        [advertised]|
| POSITIVE|""Delight"" says ...| [""delight"", says]|
| NEGATIVE|      Cough Medicine|   [cough, medicine]|
| POSITIVE|         Great taffy|      [great, taffy]|
| POSITIVE|          Nice Taffy|       [nice, taffy]|
| POSITIVE|Great!  Just as g...|[great!, good, ex...|
| POSITIVE|Wonderful, tasty ...|[wonderful, tasty...|
| POSITIVE|          Yay Barley|       [yay, barley]|
| POSITIVE|    Healthy Dog Food|[healthy, dog, food]|
| POSITIVE|The Best Hot Sauc...|[best, hot, sauce...|
| POSITIVE|My cats LOVE this...|[cats, love, ""di...|
| NEGATIVE|My Cats Are Not F...|[cats, fans, new,...|
| POSITIVE|   fresh and greasy!|    [fresh, greasy!]|
| POSITIVE|Strawberry Twizzl...|[strawberry, twiz...|
| POSITIVE|Lots of twizzlers

### Hash the words

In [54]:
countVectorizer = CountVectorizer(inputCol=stopWordsRemover.getOutputCol(),
                                  outputCol="frequencies",
                                  minDF=100)

Manually train the count vectorizer just for demonstration purposes

In [55]:
countVecModel = countVectorizer.fit(stopWordsRemoved)

See the vocabulary

In [56]:
print("Vocabulary size is " + str(len(countVecModel.vocabulary)))

print(countVecModel.vocabulary[:10])

Vocabulary size is 1767
[u'great', u'good', u'best', u'love', u'coffee', u'tea', u'product', u'taste', u'delicious', u'excellent']


Inspect the data

In [57]:
vectorized = countVecModel.transform(stopWordsRemoved)
vectorized.select(["Sentiment", "CleanedSummary", "frequencies"]).show()

+---------+--------------------+--------------------+
|Sentiment|      CleanedSummary|         frequencies|
+---------+--------------------+--------------------+
| POSITIVE|[good, quality, d...|(1767,[1,10,12,35...|
| NEGATIVE|        [advertised]|  (1767,[605],[1.0])|
| POSITIVE| [""delight"", says]|  (1767,[422],[1.0])|
| NEGATIVE|   [cough, medicine]| (1767,[1700],[1.0])|
| POSITIVE|      [great, taffy]|(1767,[0,1461],[1...|
| POSITIVE|       [nice, taffy]|(1767,[29,1461],[...|
| POSITIVE|[great!, good, ex...|(1767,[1,62,130],...|
| POSITIVE|[wonderful, tasty...|(1767,[15,37,1461...|
| POSITIVE|       [yay, barley]|        (1767,[],[])|
| POSITIVE|[healthy, dog, food]|(1767,[10,12,21],...|
| POSITIVE|[best, hot, sauce...|(1767,[2,43,87,42...|
| POSITIVE|[cats, love, ""di...|(1767,[3,12,23,44...|
| NEGATIVE|[cats, fans, new,...|(1767,[12,44,80,1...|
| POSITIVE|    [fresh, greasy!]|   (1767,[85],[1.0])|
| POSITIVE|[strawberry, twiz...|(1767,[13,19,692]...|
| POSITIVE|[lots, twizzlers,

### Create inverse document frequencies model

In [58]:
idf = IDF(inputCol=countVectorizer.getOutputCol(),
          outputCol="tf_idf_frequencies",
          minDocFreq=1)

Manually train the IDF model just for demonstration purposes

In [59]:
idfModel = idf.fit(vectorized)

Inspect the data

In [60]:
afterIdf = idfModel.transform(vectorized)
afterIdf.select(["Sentiment", "CleanedSummary", "frequencies", "tf_idf_frequencies"]).show()

+---------+--------------------+--------------------+--------------------+
|Sentiment|      CleanedSummary|         frequencies|  tf_idf_frequencies|
+---------+--------------------+--------------------+--------------------+
| POSITIVE|[good, quality, d...|(1767,[1,10,12,35...|(1767,[1,10,12,35...|
| NEGATIVE|        [advertised]|  (1767,[605],[1.0])|(1767,[605],[7.30...|
| POSITIVE| [""delight"", says]|  (1767,[422],[1.0])|(1767,[422],[6.91...|
| NEGATIVE|   [cough, medicine]| (1767,[1700],[1.0])|(1767,[1700],[8.5...|
| POSITIVE|      [great, taffy]|(1767,[0,1461],[1...|(1767,[0,1461],[2...|
| POSITIVE|       [nice, taffy]|(1767,[29,1461],[...|(1767,[29,1461],[...|
| POSITIVE|[great!, good, ex...|(1767,[1,62,130],...|(1767,[1,62,130],...|
| POSITIVE|[wonderful, tasty...|(1767,[15,37,1461...|(1767,[15,37,1461...|
| POSITIVE|       [yay, barley]|        (1767,[],[])|        (1767,[],[])|
| POSITIVE|[healthy, dog, food]|(1767,[10,12,21],...|(1767,[10,12,21],...|
| POSITIVE|[best, hot, sa

### Remove Summary Column

The algoritm does not understand the string value. That's why we transformed the data using TF-IDF and should drop the original string information


In [63]:
removeSummary = SQLTransformer(
    statement="""
    SELECT Sentiment, Day, Month, Year, WeekNum, WeekDay, HourOfDay, Weekend, Seasson, tf_idf_frequencies
    FROM __THIS__ """)

Inspect the data

In [64]:
removedSummary = removeSummary.transform(afterIdf)
removedSummary.show()

+---------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
|Sentiment|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|  tf_idf_frequencies|
+---------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
| POSITIVE| 26|    4|2011|     17|    Tue|       17|      0| Spring|(1767,[1,10,12,35...|
| NEGATIVE|  6|    9|2012|     36|    Thu|       17|      0| Spring|(1767,[605],[7.30...|
| POSITIVE| 17|    8|2008|     33|    Sun|       17|      1| Spring|(1767,[422],[6.91...|
| NEGATIVE| 12|    6|2011|     23|    Sun|       17|      1| Spring|(1767,[1700],[8.5...|
| POSITIVE| 20|   10|2012|     42|    Sat|       17|      1| Spring|(1767,[0,1461],[2...|
| POSITIVE| 11|    7|2012|     28|    Wed|       17|      0| Spring|(1767,[29,1461],[...|
| POSITIVE| 19|    6|2012|     25|    Tue|       17|      0| Spring|(1767,[1,62,130],...|
| POSITIVE|  2|    5|2012|     18|    Wed|       17|      0| Spring|(1767,[15,37,1461...|
| POSITIVE

### Create GBM model

In [65]:
from pysparkling.ml import ColumnPruner, H2OGBM

gbm = H2OGBM(ratio=0.8,
             featuresCols=[idf.getOutputCol()],
             predictionCol="Sentiment")

###  Create the pipeline by defining all the stages

In [68]:
pipeline = Pipeline(stages=[colSelect,
                            refineTime,
                            filterScore,
                            regexTokenizer,
                            stopWordsRemover,
                            countVectorizer,
                            idf,
                            removeSummary,
                            gbm])

## Train the pipeline model

In [69]:
model = pipeline.fit(reviews_spark)

Try predicting on the same input data

In [70]:
model.transform(reviews_spark).show()

+---------+---+-----+----+-------+-------+---------+-------+-------+--------------------+--------------------+-------------------+
|Sentiment|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|  tf_idf_frequencies|            NEGATIVE|           POSITIVE|
+---------+---+-----+----+-------+-------+---------+-------+-------+--------------------+--------------------+-------------------+
| POSITIVE| 26|    4|2011|     17|    Tue|       17|      0| Spring|(1767,[1,10,12,35...| 0.11963043647646754| 0.8803695635235325|
| NEGATIVE|  6|    9|2012|     36|    Thu|       17|      0| Spring|(1767,[605],[7.30...|  0.1822741616671273| 0.8177258383328727|
| POSITIVE| 17|    8|2008|     33|    Sun|       17|      1| Spring|(1767,[422],[6.91...|  0.1822741616671273| 0.8177258383328727|
| NEGATIVE| 12|    6|2011|     23|    Sun|       17|      1| Spring|(1767,[1700],[8.5...|  0.1822741616671273| 0.8177258383328727|
| POSITIVE| 20|   10|2012|     42|    Sat|       17|      1| Spring|(1767,[0,1461],

In [71]:
## Save the pipeline model

In [72]:
model.write().overwrite().save("reviews_pipeline.model")

Inspect the vocabulary

In [73]:
model.stages[5].vocabulary[0]

u'great'