<h1 style="display: inline;" >8. Spark and Java Objects (C2)


In practice, H2O will have to be integrated with various products in the big data ecosystem. Here we demonstrate this in two ways: (1) Integrating H2O with Spark via 'Sparkling Water', and (2) Using a trained H2O 'POJO' model. In this second exercise we will also make use of Spark (although not via the Sparkling Water interface).

**Training a model with Sparkling Water:**

Spark is a distributed cluster computing framework for general computation. H2O is a specialized computing framework for machine learning. As such, it will often make sense to integrate the two — to perform some aspects of the process (e.g. data wrangling, transformations) in Spark, while training models in H2O. 'Sparkling Water' is the interface for doing so.

Here we retrain the K-means model on the Iris dataset, using the following `IrisCluster` class:

~~~~java
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.h2o.JavaH2OContext;
import water.Job;
import water.fvec.H2OFrame;
import hex.kmeans.*;
import hex.kmeans.KMeansModel.KMeansParameters;
import hex.kmeans.KMeansModel.KMeansOutput;

import java.util.Arrays;
import java.io.File;
import java.io.FileOutputStream;
import hex.Model.JavaModelStreamWriter;

public class IrisCluster {
  public static void main(String[] args) throws Exception {
    String inputFile = args[0];
             
    SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate();
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
    JavaH2OContext jhc = JavaH2OContext.getOrCreate(jsc);

    Dataset<Row> irisDF = spark.read().format("csv").option("header","true").option("inferSchema", true).load(inputFile);
    irisDF.show();

    H2OFrame iris_hex = jhc.asH2OFrame(irisDF, "iris_hex");

    double[] means = iris_hex.means();
    System.out.println(iris_hex.toString());

    KMeansParameters kmParams = new KMeansParameters();
    kmParams._k = 3;
    kmParams._train = iris_hex._key;

    KMeans km = new KMeans(kmParams);
    Job j = km.trainModel();
    KMeansModel kmModel = km.get();

    File destFile = new File(kmModel._key.toString() + ".java");
    FileOutputStream fos = new FileOutputStream(destFile);
    JavaModelStreamWriter writer = kmModel.new JavaModelStreamWriter(false);
    writer.writeTo(fos);
    fos.close();
  }
}
~~~~

The code was compiled in maven to create a shaded jar with the following dependencies:
    
~~~~xml
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.1.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>ai.h2o</groupId>
      <artifactId>sparkling-water-core_2.11</artifactId>
      <version>2.2.1</version>
    </dependency>
  </dependencies>
~~~~

The code was run in Spark using:
    
`$ spark-submit --class IrisCluster sparkh2oexample.jar iris.csv`

The result of this code is to produce a POJO, output to `KMeans_model_<modelnumber>_1.java`. 

**Productionizing the POJO:** The outputted java object can then be referenced from any other Java/Scala program. Here we create a simple Spark program that makes use of the model to cluster the iris data:

~~~~java
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.lang.Double;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.*;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.*;
import hex.genmodel.easy.RowData;
import hex.genmodel.easy.EasyPredictModelWrapper;
import hex.genmodel.easy.prediction.*;

public class IrisPredict {
  private static String modelClassName = "KMeans_model_1508336904807_1";

  public static void main(String[] args) throws Exception {
  
    // Set up H2O model

    hex.genmodel.GenModel rawModel;
    rawModel = (hex.genmodel.GenModel) Class.forName(modelClassName).newInstance();
    EasyPredictModelWrapper model = new EasyPredictModelWrapper(rawModel);

    // Get data into spark

    String inputFile = "iris.csv";

    SparkSession spark = SparkSession.builder().appName("H2O-example").getOrCreate();
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

    Dataset<Row> irisDF = spark.read().format("csv").option("header","true").option("inferSchema", true).load(inputFile);

    List<String> irisFields = Arrays.asList("sepal_length","sepal_width","petal_length","petal_width");

    // Apply clustering model

    irisDF.foreach( row -> {
       List<String> listData = new ArrayList<>();
       RowData rowData = new RowData();
       for (String field: irisFields) {
          listData.add(Double.toString(row.getAs(field)));
          rowData.put(field, row.getAs(field));
       }
       ClusteringModelPrediction p = model.predictClustering(rowData);
       listData.add(Integer.toString(p.cluster));
       System.out.println(listData.toString());
    });
  }
}
~~~~

The program can be run from the attached JAR as follows:

`$ spark-submit --class IrisPredict target/sparkh2oexample.jar iris.csv`

The output consists of the iris data points, clustered to 0-2:
    
```
[5.1, 3.5, 1.4, 0.2, 0]
[4.9, 3.0, 1.4, 0.2, 0]
[4.7, 3.2, 1.3, 0.2, 0]
[4.6, 3.1, 1.5, 0.2, 0]
[5.0, 3.6, 1.4, 0.2, 0]
[5.4, 3.9, 1.7, 0.4, 0]
[4.6, 3.4, 1.4, 0.3, 0]
[5.0, 3.4, 1.5, 0.2, 0]
[4.4, 2.9, 1.4, 0.2, 0]
[4.9, 3.1, 1.5, 0.1, 0]
[5.4, 3.7, 1.5, 0.2, 0]
[4.8, 3.4, 1.6, 0.2, 0]
[4.8, 3.0, 1.4, 0.1, 0]
[4.3, 3.0, 1.1, 0.1, 0]
[5.8, 4.0, 1.2, 0.2, 0]
[5.7, 4.4, 1.5, 0.4, 0]
[5.4, 3.9, 1.3, 0.4, 0]
[5.1, 3.5, 1.4, 0.3, 0]
[5.7, 3.8, 1.7, 0.3, 0]
[5.1, 3.8, 1.5, 0.3, 0]
[5.4, 3.4, 1.7, 0.2, 0]
[5.1, 3.7, 1.5, 0.4, 0]
[4.6, 3.6, 1.0, 0.2, 0]
[5.1, 3.3, 1.7, 0.5, 0]
[4.8, 3.4, 1.9, 0.2, 0]
[5.0, 3.0, 1.6, 0.2, 0]
[5.0, 3.4, 1.6, 0.4, 0]
[5.2, 3.5, 1.5, 0.2, 0]
[5.2, 3.4, 1.4, 0.2, 0]
[4.7, 3.2, 1.6, 0.2, 0]
[4.8, 3.1, 1.6, 0.2, 0]
[5.4, 3.4, 1.5, 0.4, 0]
[5.2, 4.1, 1.5, 0.1, 0]
[5.5, 4.2, 1.4, 0.2, 0]
[4.9, 3.1, 1.5, 0.1, 0]
[5.0, 3.2, 1.2, 0.2, 0]
[5.5, 3.5, 1.3, 0.2, 0]
[4.9, 3.1, 1.5, 0.1, 0]
[4.4, 3.0, 1.3, 0.2, 0]
[5.1, 3.4, 1.5, 0.2, 0]
[5.0, 3.5, 1.3, 0.3, 0]
[4.5, 2.3, 1.3, 0.3, 0]
[4.4, 3.2, 1.3, 0.2, 0]
[5.0, 3.5, 1.6, 0.6, 0]
[5.1, 3.8, 1.9, 0.4, 0]
[4.8, 3.0, 1.4, 0.3, 0]
[5.1, 3.8, 1.6, 0.2, 0]
[4.6, 3.2, 1.4, 0.2, 0]
[5.3, 3.7, 1.5, 0.2, 0]
[5.0, 3.3, 1.4, 0.2, 0]
[7.0, 3.2, 4.7, 1.4, 2]
[6.4, 3.2, 4.5, 1.5, 2]
[6.9, 3.1, 4.9, 1.5, 2]
[5.5, 2.3, 4.0, 1.3, 1]
[6.5, 2.8, 4.6, 1.5, 1]
[5.7, 2.8, 4.5, 1.3, 1]
[6.3, 3.3, 4.7, 1.6, 2]
[4.9, 2.4, 3.3, 1.0, 1]
[6.6, 2.9, 4.6, 1.3, 1]
[5.2, 2.7, 3.9, 1.4, 1]
[5.0, 2.0, 3.5, 1.0, 1]
[5.9, 3.0, 4.2, 1.5, 1]
[6.0, 2.2, 4.0, 1.0, 1]
[6.1, 2.9, 4.7, 1.4, 1]
[5.6, 2.9, 3.6, 1.3, 1]
[6.7, 3.1, 4.4, 1.4, 2]
[5.6, 3.0, 4.5, 1.5, 1]
[5.8, 2.7, 4.1, 1.0, 1]
[6.2, 2.2, 4.5, 1.5, 1]
[5.6, 2.5, 3.9, 1.1, 1]
[5.9, 3.2, 4.8, 1.8, 2]
[6.1, 2.8, 4.0, 1.3, 1]
[6.3, 2.5, 4.9, 1.5, 1]
[6.1, 2.8, 4.7, 1.2, 1]
[6.4, 2.9, 4.3, 1.3, 1]
[6.6, 3.0, 4.4, 1.4, 2]
[6.8, 2.8, 4.8, 1.4, 2]
[6.7, 3.0, 5.0, 1.7, 2]
[6.0, 2.9, 4.5, 1.5, 1]
[5.7, 2.6, 3.5, 1.0, 1]
[5.5, 2.4, 3.8, 1.1, 1]
[5.5, 2.4, 3.7, 1.0, 1]
[5.8, 2.7, 3.9, 1.2, 1]
[6.0, 2.7, 5.1, 1.6, 1]
[5.4, 3.0, 4.5, 1.5, 1]
[6.0, 3.4, 4.5, 1.6, 2]
[6.7, 3.1, 4.7, 1.5, 2]
[6.3, 2.3, 4.4, 1.3, 1]
[5.6, 3.0, 4.1, 1.3, 1]
[5.5, 2.5, 4.0, 1.3, 1]
[5.5, 2.6, 4.4, 1.2, 1]
[6.1, 3.0, 4.6, 1.4, 1]
[5.8, 2.6, 4.0, 1.2, 1]
[5.0, 2.3, 3.3, 1.0, 1]
[5.6, 2.7, 4.2, 1.3, 1]
[5.7, 3.0, 4.2, 1.2, 1]
[5.7, 2.9, 4.2, 1.3, 1]
[6.2, 2.9, 4.3, 1.3, 1]
[5.1, 2.5, 3.0, 1.1, 1]
[5.7, 2.8, 4.1, 1.3, 1]
[6.3, 3.3, 6.0, 2.5, 2]
[5.8, 2.7, 5.1, 1.9, 1]
[7.1, 3.0, 5.9, 2.1, 2]
[6.3, 2.9, 5.6, 1.8, 2]
[6.5, 3.0, 5.8, 2.2, 2]
[7.6, 3.0, 6.6, 2.1, 2]
[4.9, 2.5, 4.5, 1.7, 1]
[7.3, 2.9, 6.3, 1.8, 2]
[6.7, 2.5, 5.8, 1.8, 2]
[7.2, 3.6, 6.1, 2.5, 2]
[6.5, 3.2, 5.1, 2.0, 2]
[6.4, 2.7, 5.3, 1.9, 2]
[6.8, 3.0, 5.5, 2.1, 2]
[5.7, 2.5, 5.0, 2.0, 1]
[5.8, 2.8, 5.1, 2.4, 1]
[6.4, 3.2, 5.3, 2.3, 2]
[6.5, 3.0, 5.5, 1.8, 2]
[7.7, 3.8, 6.7, 2.2, 2]
[7.7, 2.6, 6.9, 2.3, 2]
[6.0, 2.2, 5.0, 1.5, 1]
[6.9, 3.2, 5.7, 2.3, 2]
[5.6, 2.8, 4.9, 2.0, 1]
[7.7, 2.8, 6.7, 2.0, 2]
[6.3, 2.7, 4.9, 1.8, 1]
[6.7, 3.3, 5.7, 2.1, 2]
[7.2, 3.2, 6.0, 1.8, 2]
[6.2, 2.8, 4.8, 1.8, 1]
[6.1, 3.0, 4.9, 1.8, 2]
[6.4, 2.8, 5.6, 2.1, 2]
[7.2, 3.0, 5.8, 1.6, 2]
[7.4, 2.8, 6.1, 1.9, 2]
[7.9, 3.8, 6.4, 2.0, 2]
[6.4, 2.8, 5.6, 2.2, 2]
[6.3, 2.8, 5.1, 1.5, 1]
[6.1, 2.6, 5.6, 1.4, 1]
[7.7, 3.0, 6.1, 2.3, 2]
[6.3, 3.4, 5.6, 2.4, 2]
[6.4, 3.1, 5.5, 1.8, 2]
[6.0, 3.0, 4.8, 1.8, 1]
[6.9, 3.1, 5.4, 2.1, 2]
[6.7, 3.1, 5.6, 2.4, 2]
[6.9, 3.1, 5.1, 2.3, 2]
[5.8, 2.7, 5.1, 1.9, 1]
[6.8, 3.2, 5.9, 2.3, 2]
[6.7, 3.3, 5.7, 2.5, 2]
[6.7, 3.0, 5.2, 2.3, 2]
[6.3, 2.5, 5.0, 1.9, 1]
[6.5, 3.0, 5.2, 2.0, 2]
[6.2, 3.4, 5.4, 2.3, 2]
[5.9, 3.0, 5.1, 1.8, 1]
```