# Sparkling Water Pipeline Productionalization

## Background

Sparkling Water provides access to H2O algorithms and publishes API to integrate them as part of regular Spark pipelines. This feature allows for seamless training and deployment of H2O algorithms in the Spark environment. Furthermore, trained pipelines do not require H2O runtime anymore (thanks to MOJO representation of trained H2O models) which enables variety of deployment scenarios. Sparkling Water can be also used for deplying Driverless AI models.

Moreover, by supporting Python and Scala environment, we enable a simple transfer of modeling results between data scientists (Python land) and production (JVM land).


## Goal

The goal of this hands-on is to:
  - show integration of H2O models into Spark pipelines using PySpark and PySparkling
  - demonstrate deployment of the trained pipeline in the context of JVM and Spark streaming
  
Our modeling goal is to predict sentiment of Amazon food reviews. For this purpose, we use a pre-processed dataset from [SNAP repository](https://snap.stanford.edu/data/web-FineFoods.html). The dataset contains multiple columns but for simplicity, we will use only date, summary and overall score. The score helps us to approximate sentiment.

![Scenario](./img/scenario.png)

## Environment preparation

First, let's verify that `SparkSession` is available in the notebook environment. We do not need to explicitly create `SparkSession` as it is created for us
automatically during start of the Jupyter notebook. This works because of the Jupyter is set up with a Spark kernel.


In [12]:
spark

### Prepare `H2OContext`

We will start `H2OContext` in the so called _internal backend_ mode. The means H2O is sharing JVM with Spark (see details in [Sparkling Water documentation](https://github.com/h2oai/sparkling-water/blob/rel-2.2/doc/tutorials/backends.rst)).

The following call initializes H2O on each Spark executor in the Spark cluster.

In [13]:
from pysparkling import *
hc = H2OContext.getOrCreate(spark)


Sparkling Water Context:
 * H2O name: sparkling-water-kuba_local-1549059517922
 * cluster size: 1
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (driver,jakubs-mbp.0xdata.loc,54321)
  ------------------------

  Open H2O Flow in browser: http://172.16.2.58:54321 (CMD + click in Mac OSX)

    


> Note: the reported IP is a private IP of docker container where the demo is running.


## Data preparation

We are going to use H2O to load data using H2O since it does pretty good job to guess all nuances of input format.

In [14]:
import h2o
reviews_h2o = h2o.upload_file("/Users/kuba/Downloads/AmazonReviews_Train.csv", "reviews.hex")

Parse progress: |█████████████████████████████████████████████████████████| 100%


### Explore data table in H2O flow

At this point, we can explore data directly in this notebook, or we can access H2O Flow and explore data and its properties directly there.


### Convert H2O frame to Spark frame se we can pass it as the input to the pipeline

After data exploration, we can start with data munging. We are going to use Spark, hence we need to publish H2O frame as Spark DataFrame.

In [15]:
reviews_spark = hc.as_spark_frame(reviews_h2o)

#### Trick #1: Save the original Spark schema

At this point we will save the schema of input data and we will reuse it later to configure deployed Spark streaming application.

In [16]:
reviews_spark.printSchema()

with open('schema.json','w') as f:
    f.write(str(reviews_spark.schema.json()))

root
 |-- Id: integer (nullable = false)
 |-- ProductId: string (nullable = false)
 |-- UserId: string (nullable = false)
 |-- ProfileName: string (nullable = false)
 |-- HelpfulnessNumerator: short (nullable = false)
 |-- HelpfulnessDenominator: short (nullable = false)
 |-- Score: byte (nullable = false)
 |-- Time: integer (nullable = false)
 |-- Summary: string (nullable = false)
 |-- Text: string (nullable = false)



## Now let's define all the stages for the pipeline

The Spark pipelines are composed of various transformers. In our example, we combine a few Spark transformers to clean up textual data and transform it into numerical format. The pipeline is finalized by training an H2O XGBoost binomial model.

> Note: The pipeline stages are not executed right away, they are executed during each fit and transform call.

### Define transformer to drop unnecessary columns
The Spark `SQLTransformer` allows for using SQL to munge data.

As part of this transformer, we convert timestamp to the human readable date string:

We are selecting just `Score`, `Time` and `Summary` columns. The goal of this demo is to predict sentiment, ie, whether the review is positive or negative. The review can be influenced by several aspects. The `Summary` is of course the mostly important information, but `Time` can influence the model as well. For example, people may tend to give higher reviews on Friday evenings because there's a weekend in from of them :)

In [17]:
from pyspark.ml.feature import SQLTransformer
colSelect = SQLTransformer(
    statement="SELECT Score, from_unixtime(Time) as Time, Summary FROM __THIS__")

#### Trick #2: Explore intermediate results
To explore intermediate results, we can also invoke defined transformer directly

In [18]:
selected = colSelect.transform(reviews_spark)
selected.show()

+-----+-------------------+--------------------+
|Score|               Time|             Summary|
+-----+-------------------+--------------------+
|    5|2011-04-26 17:00:00|Good Quality Dog ...|
|    1|2012-09-06 17:00:00|   Not as Advertised|
|    4|2008-08-17 17:00:00|"Delight" says it...|
|    2|2011-06-12 17:00:00|      Cough Medicine|
|    5|2012-10-20 17:00:00|         Great taffy|
|    4|2012-07-11 17:00:00|          Nice Taffy|
|    5|2012-06-19 17:00:00|Great!  Just as g...|
|    5|2012-05-02 17:00:00|Wonderful, tasty ...|
|    5|2011-11-22 16:00:00|          Yay Barley|
|    5|2012-10-25 17:00:00|    Healthy Dog Food|
|    5|2005-02-07 16:00:00|The Best Hot Sauc...|
|    5|2010-08-26 17:00:00|My cats LOVE this...|
|    1|2012-06-12 17:00:00|My Cats Are Not F...|
|    4|2010-11-04 17:00:00|   fresh and greasy!|
|    5|2010-03-11 16:00:00|Strawberry Twizzl...|
|    5|2009-12-28 16:00:00|Lots of twizzlers...|
|    2|2012-09-19 17:00:00|          poor taste|
|    5|2012-08-15 17

### Create transformer which creates several time columns based on the `Time` colum

The time is stored as a timestamp, however, we would like to get a more human readable information from it. We can use the SparkSQL data methods such as `month`, `dayofmonth` and so on to achieve this.

In [19]:
refineTime = SQLTransformer(
    statement="""
    SELECT  Score,
            Summary, 
            dayofmonth(Time) as Day, 
            month(Time) as Month, year(Time) as Year, 
            weekofyear(Time) as WeekNum, 
            date_format(Time, 'EEE') as WeekDay, 
            hour(Time) as HourOfDay, 
            IF(date_format(Time, 'EEE')='Sat' OR date_format(Time, 'EEE')='Sun', 1, 0) as Weekend, 
            CASE 
                WHEN month(TIME)=12 OR month(Time)<=2 THEN 'Winter' 
                WHEN month(TIME)>=3 OR month(Time)<=5 THEN 'Spring' 
                WHEN month(TIME)>=6 AND month(Time)<=9 THEN 'Summer' 
                ELSE 'Autumn' END as Seasson 
    FROM __THIS__""")

Inspect the data after 

In [20]:
refined = refineTime.transform(selected)
refined.show()

+-----+--------------------+---+-----+----+-------+-------+---------+-------+-------+
|Score|             Summary|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|
+-----+--------------------+---+-----+----+-------+-------+---------+-------+-------+
|    5|Good Quality Dog ...| 26|    4|2011|     17|    Tue|       17|      0| Spring|
|    1|   Not as Advertised|  6|    9|2012|     36|    Thu|       17|      0| Spring|
|    4|"Delight" says it...| 17|    8|2008|     33|    Sun|       17|      1| Spring|
|    2|      Cough Medicine| 12|    6|2011|     23|    Sun|       17|      1| Spring|
|    5|         Great taffy| 20|   10|2012|     42|    Sat|       17|      1| Spring|
|    4|          Nice Taffy| 11|    7|2012|     28|    Wed|       17|      0| Spring|
|    5|Great!  Just as g...| 19|    6|2012|     25|    Tue|       17|      0| Spring|
|    5|Wonderful, tasty ...|  2|    5|2012|     18|    Wed|       17|      0| Spring|
|    5|          Yay Barley| 22|   11|2011|     47|   

### Remove neutral reviews and classify the Scores

We are not interested in the neutral reviews (reviews with the `Score=3`) as they would not add much information to the model

In [21]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, udf
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, IDF, CountVectorizer

filterScore = SQLTransformer(
    statement="""
    SELECT  IF(Score<3,'NEGATIVE', 'POSITIVE') as Sentiment, Summary, Day, Month, Year,
            WeekNum, WeekDay, HourOfDay, Weekend, Seasson 
    FROM __THIS__ WHERE Score !=3 """)



 Inspect the data

In [22]:
filtered = filterScore.transform(refined)
filtered.show()

+---------+--------------------+---+-----+----+-------+-------+---------+-------+-------+
|Sentiment|             Summary|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|
+---------+--------------------+---+-----+----+-------+-------+---------+-------+-------+
| POSITIVE|Good Quality Dog ...| 26|    4|2011|     17|    Tue|       17|      0| Spring|
| NEGATIVE|   Not as Advertised|  6|    9|2012|     36|    Thu|       17|      0| Spring|
| POSITIVE|"Delight" says it...| 17|    8|2008|     33|    Sun|       17|      1| Spring|
| NEGATIVE|      Cough Medicine| 12|    6|2011|     23|    Sun|       17|      1| Spring|
| POSITIVE|         Great taffy| 20|   10|2012|     42|    Sat|       17|      1| Spring|
| POSITIVE|          Nice Taffy| 11|    7|2012|     28|    Wed|       17|      0| Spring|
| POSITIVE|Great!  Just as g...| 19|    6|2012|     25|    Tue|       17|      0| Spring|
| POSITIVE|Wonderful, tasty ...|  2|    5|2012|     18|    Wed|       17|      0| Spring|
| POSITIVE

### Tokenize the message

Here we use the [RegexTokenizer](https://spark.apache.org/docs/2.1.0/ml-features.html#tokenizer) to tokenize the messages.

In [23]:
regexTokenizer = RegexTokenizer(inputCol="Summary",
                                outputCol="tokenized_summary",
                                pattern="[, ]",
                                toLowercase=True)

Inspect the data

In [24]:
tokenized = regexTokenizer.transform(filtered)
tokenized.show()

+---------+--------------------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
|Sentiment|             Summary|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|   tokenized_summary|
+---------+--------------------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
| POSITIVE|Good Quality Dog ...| 26|    4|2011|     17|    Tue|       17|      0| Spring|[good, quality, d...|
| NEGATIVE|   Not as Advertised|  6|    9|2012|     36|    Thu|       17|      0| Spring|[not, as, adverti...|
| POSITIVE|"Delight" says it...| 17|    8|2008|     33|    Sun|       17|      1| Spring|["delight", says,...|
| NEGATIVE|      Cough Medicine| 12|    6|2011|     23|    Sun|       17|      1| Spring|   [cough, medicine]|
| POSITIVE|         Great taffy| 20|   10|2012|     42|    Sat|       17|      1| Spring|      [great, taffy]|
| POSITIVE|          Nice Taffy| 11|    7|2012|     28|    Wed|       17|      0| Spring|       [nice, taffy]|
|

### Remove unnecessary words

Some words do not bring much information for the resulting model. For this, we use the [StopWordsRemover](https://spark.apache.org/docs/2.1.0/ml-features.html#stopwordsremover) to clean the data.

In [25]:
stopWordsRemover = StopWordsRemover(inputCol=regexTokenizer.getOutputCol(),
                                    outputCol="CleanedSummary",
                                    caseSensitive=False)

Inspect the data

In [26]:
stopWordsRemoved = stopWordsRemover.transform(tokenized)
stopWordsRemoved.select(["Sentiment", "Summary", "CleanedSummary"]).show()

+---------+--------------------+--------------------+
|Sentiment|             Summary|      CleanedSummary|
+---------+--------------------+--------------------+
| POSITIVE|Good Quality Dog ...|[good, quality, d...|
| NEGATIVE|   Not as Advertised|        [advertised]|
| POSITIVE|"Delight" says it...|   ["delight", says]|
| NEGATIVE|      Cough Medicine|   [cough, medicine]|
| POSITIVE|         Great taffy|      [great, taffy]|
| POSITIVE|          Nice Taffy|       [nice, taffy]|
| POSITIVE|Great!  Just as g...|[great!, good, ex...|
| POSITIVE|Wonderful, tasty ...|[wonderful, tasty...|
| POSITIVE|          Yay Barley|       [yay, barley]|
| POSITIVE|    Healthy Dog Food|[healthy, dog, food]|
| POSITIVE|The Best Hot Sauc...|[best, hot, sauce...|
| POSITIVE|My cats LOVE this...|[cats, love, "die...|
| NEGATIVE|My Cats Are Not F...|[cats, fans, new,...|
| POSITIVE|   fresh and greasy!|    [fresh, greasy!]|
| POSITIVE|Strawberry Twizzl...|[strawberry, twiz...|
| POSITIVE|Lots of twizzlers

### Hash the words

The algorithms can efficiently work with the numeric values hence we create a numeric representation of them using [CountVectorizer](https://spark.apache.org/docs/2.1.0/ml-features.html#countvectorizer).

CountVectorizer is very similar to [HashingTF](https://spark.apache.org/docs/2.1.0/ml-features.html#tf-idf) function, except that it preserves the mapping from the index back to the word using internal vocabulary.

For example, if word `Dog` is stored in the hash at the index `100`, we can get the word back as `countVectorizerModel.vocabulary[100]`

#### Trick #3: Set minDF parameter to limit number of words

The minDF parameter ensures that only words which occur more the 100 times in our case are included. This also speeds the process of modelling and ensures that outliers does not affect our model that much.


In [27]:
countVectorizer = CountVectorizer(inputCol=stopWordsRemover.getOutputCol(),
                                  outputCol="frequencies",
                                  minDF=100)

#### Trick #4: Manually train the count vectorizer so we can see how it behaves before we execute the pipeline


In [28]:
countVecModel = countVectorizer.fit(stopWordsRemoved)

See the vocabulary

In [29]:
print("Vocabulary size is " + str(len(countVecModel.vocabulary)))

print(countVecModel.vocabulary[:10])

Vocabulary size is 1528
['great', 'good', 'best', 'love', 'coffee', 'tea', 'product', 'taste', 'delicious', 'excellent']


Inspect the data

In [30]:
vectorized = countVecModel.transform(stopWordsRemoved)
vectorized.select(["Sentiment", "CleanedSummary", "frequencies"]).show()

+---------+--------------------+--------------------+
|Sentiment|      CleanedSummary|         frequencies|
+---------+--------------------+--------------------+
| POSITIVE|[good, quality, d...|(1528,[1,10,12,35...|
| NEGATIVE|        [advertised]|  (1528,[620],[1.0])|
| POSITIVE|   ["delight", says]|  (1528,[402],[1.0])|
| NEGATIVE|   [cough, medicine]|        (1528,[],[])|
| POSITIVE|      [great, taffy]|(1528,[0,1430],[1...|
| POSITIVE|       [nice, taffy]|(1528,[29,1430],[...|
| POSITIVE|[great!, good, ex...|(1528,[1,59,126],...|
| POSITIVE|[wonderful, tasty...|(1528,[15,37,1430...|
| POSITIVE|       [yay, barley]|        (1528,[],[])|
| POSITIVE|[healthy, dog, food]|(1528,[10,12,21],...|
| POSITIVE|[best, hot, sauce...|(1528,[2,44,86,45...|
| POSITIVE|[cats, love, "die...|(1528,[3,12,23,41...|
| NEGATIVE|[cats, fans, new,...|(1528,[12,41,79],...|
| POSITIVE|    [fresh, greasy!]|   (1528,[83],[1.0])|
| POSITIVE|[strawberry, twiz...|(1528,[13,19,665]...|
| POSITIVE|[lots, twizzlers,

### Create inverse document frequencies model

Here we use [tf-idf](https://spark.apache.org/docs/2.1.0/ml-features.html#tf-idf) method to reflect the importance of a term to a document in the given set of data. Please see the Spark documentation for more information at [tf-idf](https://spark.apache.org/docs/2.1.0/ml-features.html#tf-idf).

In [31]:
idf = IDF(inputCol=countVectorizer.getOutputCol(),
          outputCol="tf_idf_frequencies",
          minDocFreq=1)

Manually train the IDF model to see the results before we execute the pipeline

In [32]:
idfModel = idf.fit(vectorized)

Inspect the data

In [33]:
afterIdf = idfModel.transform(vectorized)
afterIdf.select(["Sentiment", "CleanedSummary", "frequencies", "tf_idf_frequencies"]).show()

+---------+--------------------+--------------------+--------------------+
|Sentiment|      CleanedSummary|         frequencies|  tf_idf_frequencies|
+---------+--------------------+--------------------+--------------------+
| POSITIVE|[good, quality, d...|(1528,[1,10,12,35...|(1528,[1,10,12,35...|
| NEGATIVE|        [advertised]|  (1528,[620],[1.0])|(1528,[620],[7.32...|
| POSITIVE|   ["delight", says]|  (1528,[402],[1.0])|(1528,[402],[6.86...|
| NEGATIVE|   [cough, medicine]|        (1528,[],[])|        (1528,[],[])|
| POSITIVE|      [great, taffy]|(1528,[0,1430],[1...|(1528,[0,1430],[2...|
| POSITIVE|       [nice, taffy]|(1528,[29,1430],[...|(1528,[29,1430],[...|
| POSITIVE|[great!, good, ex...|(1528,[1,59,126],...|(1528,[1,59,126],...|
| POSITIVE|[wonderful, tasty...|(1528,[15,37,1430...|(1528,[15,37,1430...|
| POSITIVE|       [yay, barley]|        (1528,[],[])|        (1528,[],[])|
| POSITIVE|[healthy, dog, food]|(1528,[10,12,21],...|(1528,[10,12,21],...|
| POSITIVE|[best, hot, sa

### Remove Summary Column

The algoritms do not understand the string values very well. This is also the reason why we transformed the data using TF-IDF and created a numeric values out of the `Summary` column. Now we can drop the original string information so we do not confuse the model.


In [34]:
removeSummary = SQLTransformer(
    statement="""
    SELECT Sentiment, Day, Month, Year, WeekNum, WeekDay, HourOfDay, Weekend, Seasson, tf_idf_frequencies
    FROM __THIS__ """)

Inspect the data

In [35]:
removedSummary = removeSummary.transform(afterIdf)
removedSummary.show()

+---------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
|Sentiment|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|  tf_idf_frequencies|
+---------+---+-----+----+-------+-------+---------+-------+-------+--------------------+
| POSITIVE| 26|    4|2011|     17|    Tue|       17|      0| Spring|(1528,[1,10,12,35...|
| NEGATIVE|  6|    9|2012|     36|    Thu|       17|      0| Spring|(1528,[620],[7.32...|
| POSITIVE| 17|    8|2008|     33|    Sun|       17|      1| Spring|(1528,[402],[6.86...|
| NEGATIVE| 12|    6|2011|     23|    Sun|       17|      1| Spring|        (1528,[],[])|
| POSITIVE| 20|   10|2012|     42|    Sat|       17|      1| Spring|(1528,[0,1430],[2...|
| POSITIVE| 11|    7|2012|     28|    Wed|       17|      0| Spring|(1528,[29,1430],[...|
| POSITIVE| 19|    6|2012|     25|    Tue|       17|      0| Spring|(1528,[1,59,126],...|
| POSITIVE|  2|    5|2012|     18|    Wed|       17|      0| Spring|(1528,[15,37,1430...|
| POSITIVE

### Create XGBoost model

Here we are using H2O's estimator to train a H2O XGBoost model on `Sentiment` column with 50 trees (default). The full documentation for XGBoost is available at [H2O Documentation](http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/xgboost.html)

In [36]:
from pysparkling.ml import ColumnPruner, H2OXGBoost

xgboost = H2OXGBoost(ratio=0.8,
             featuresCols=[idf.getOutputCol()],
             predictionCol="Sentiment")

###  Create the pipeline by defining all the stages

Now we have all the pieces ready and can define the final pipeline.

In [37]:
pipeline = Pipeline(stages=[colSelect,
                            refineTime,
                            filterScore,
                            regexTokenizer,
                            stopWordsRemover,
                            countVectorizer,
                            idf,
                            removeSummary,
                            xgboost])

## Train the pipeline model

The `fit` call calls each trasformer and estimator in the pipeline and creates so called the `PipelineModel`. The model is trained from the cleaned data from previous transformers and the final model is ready to accept the raw data to make predictions

In [38]:
model = pipeline.fit(reviews_spark)

### Try predictions

First, lets load data we use for the predictions

In [39]:
reviews_h2o_pred = h2o.upload_file("/Users/kuba/Downloads/AmazonReviews_Predictions.csv", "reviews_preds.hex")

Parse progress: |█████████████████████████████████████████████████████████| 100%


And convert them to Spark so we can run the Spark pipeline on it

In [40]:
reviews_spark_pred = hc.as_spark_frame(reviews_h2o_pred)

And run the predictions

In [41]:

model.transform(reviews_spark_pred).show()

+---------+---+-----+----+-------+-------+---------+-------+-------+--------------------+--------------------+
|Sentiment|Day|Month|Year|WeekNum|WeekDay|HourOfDay|Weekend|Seasson|  tf_idf_frequencies|   prediction_output|
+---------+---+-----+----+-------+-------+---------+-------+-------+--------------------+--------------------+
| POSITIVE|  7|    6|2012|     23|    Thu|       17|      0| Spring|(1528,[48,647,130...|[0.21149086952209...|
| POSITIVE| 14|   12|2011|     50|    Wed|       16|      0| Winter|        (1528,[],[])|[0.21149086952209...|
| POSITIVE| 13|    9|2011|     37|    Tue|       17|      0| Spring|(1528,[264,306],[...|[0.21149086952209...|
| POSITIVE| 19|   10|2011|     42|    Wed|       17|      0| Spring|(1528,[26,452],[4...|[0.21149086952209...|
| POSITIVE|  8|    9|2012|     36|    Sat|       17|      1| Spring|(1528,[36,1407],[...|[0.04857343435287...|
| POSITIVE|  7|    2|2012|      6|    Tue|       16|      0| Winter|        (1528,[],[])|[0.21149086952209...|
|

## Save the pipeline model

Later we use the pipeline model in the Scala to demonstrate the deployment of the pipeline in the JVM world

In [42]:
model.write().overwrite().save("reviews_pipeline.model")

#### Trick #5: Check variable inportances

We can inspect the model in Flow and see the variable importances. However we do not have information about the words, just the indices. We can ask the `CountVectorizer` what word is on the specific index to see what words affect our model the most.

In [43]:
model.stages[5].vocabulary[0]

'great'

## Let's Deply the Application

Right now, we defined the PySPark pipeline. We will now demonstrate its deployment using Spark Streaming application in Scala where the pipeline defined above will receive raw streaming data and run preditions on them right away.

The steps will be:

 - Load the schema from the schema file
 - Create input data stream and pass it the schema. The input data stream will point to a directory where a new csv files will be comming from different streaming sources
 - Load the pipeline from the pipeline file
 - Create output data stream. For our purposes, we store the data into memory and also to a SparkSQL table
 - We can inspect the predictions  by regularly displaying the content of the desired table

In [44]:
# Check again we have spark available
spark

In [42]:
# Load the exported pipeline model
from pyspark.ml import PipelineModel
pipeline_model = PipelineModel.load("reviews_pipeline.model/")

Py4JJavaError: An error occurred while calling o58.load.
: java.lang.ClassNotFoundException: py_sparkling.ml.models.H2OMOJOModel
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
	at org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstance(ReadWrite.scala:651)
	at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$4.apply(Pipeline.scala:274)
	at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$4.apply(Pipeline.scala:272)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:272)
	at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:348)
	at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:342)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [37]:
# Load exported schema of input data
from pyspark.sql.types import StructType
import json

schema = StructType.fromJson(json.load(open("schema.json", 'r')))
print(schema)

StructType(List(StructField(Id,IntegerType,false),StructField(ProductId,StringType,false),StructField(UserId,StringType,false),StructField(ProfileName,StringType,false),StructField(HelpfulnessNumerator,ShortType,false),StructField(HelpfulnessDenominator,ShortType,false),StructField(Score,ByteType,false),StructField(Time,IntegerType,false),StructField(Summary,StringType,false),StructField(Text,StringType,false)))


In [38]:
from subprocess import Popen
Popen(["./start_streaming.sh"])

<subprocess.Popen at 0x116342f28>

In [39]:
!ls output

0.csv  15.csv 21.csv 28.csv 34.csv 40.csv 47.csv 53.csv 6.csv  66.csv
1.csv  16.csv 22.csv 29.csv 35.csv 41.csv 48.csv 54.csv 60.csv 67.csv
10.csv 17.csv 23.csv 3.csv  36.csv 42.csv 49.csv 55.csv 61.csv 7.csv
11.csv 18.csv 24.csv 30.csv 37.csv 43.csv 5.csv  56.csv 62.csv 8.csv
12.csv 19.csv 25.csv 31.csv 38.csv 44.csv 50.csv 57.csv 63.csv 9.csv
13.csv 2.csv  26.csv 32.csv 39.csv 45.csv 51.csv 58.csv 64.csv
14.csv 20.csv 27.csv 33.csv 4.csv  46.csv 52.csv 59.csv 65.csv


In [44]:
input_data_stream = spark.readStream.schema(schema).csv("output")

In [43]:
output_data_stream = pipeline_model.transform(input_data_stream)

NameError: name 'pipeline_model' is not defined

The steps above are written using Scala as:
```
 val spark = SparkSession.builder().master("local").getOrCreate()

      //
      // Load exported pipeline
      //
      import org.apache.spark.sql.types.DataType
      val pipelineModel = PipelineModel.read.load("py/examples/pipelines/reviews_pipeline.model/")

      //
      // Load exported schema of input data
      //
      val schema = StructType(DataType.fromJson(scala.io.Source.fromFile("py/examples/pipelines/schema.json").mkString).asInstanceOf[StructType].map {
        case StructField(name, dtype, nullable, metadata) => StructField(name, dtype, true, metadata)
        case rec => rec
      })
      println(schema)

      //
      // Define input stream
      //
      val inputDataStream = spark.readStream.schema(schema).csv("py/examples/data/kuba/input/*.csv")

      //
      // Apply loaded model
      //
      val outputDataStream = pipelineModel.transform(inputDataStream)

      //
      // Forward output stream into memory-sink
      //
      outputDataStream.writeStream.format("memory").queryName("predictions").start()

      //
      // Query results
      //
      while(true){
        spark.sql("select * from predictions").show()
        Thread.sleep(5000)
      }
```

### Let's see it in practice!