Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-806] Fix Importing/Exporting of pipelines, pipelineModels and MOJOS from and to HDFS #676

Merged
merged 10 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion doc/src/site/sphinx/deployment/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ Deployment
.. toctree::
:maxdepth: 2

backends.rst
backends.rst
load_mojo.rst
90 changes: 90 additions & 0 deletions doc/src/site/sphinx/deployment/load_mojo.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
Importing H2O Mojo
------------------

H2O Mojo can be imported to Sparkling Water from all data sources Spark supports such as local file, S3 or HDFS and the
semantics of the import is the same as in the Spark API.


If HDFS is not available for Spark, then call, in Scala:

.. code:: scala

import org.apache.spark.ml.h2o.models._
val model = H2OMOJOModel.createFromMojo("prostate.mojo")

or in Python:

.. code:: python

from pysparkling.ml import *
model = H2OMOJOModel.create_from_mojo("prostate.mojo")

attempts to load the mojo file with the specified name from the current working directory.
You can also specify the full path such as, in Scala:

.. code:: scala

import org.apache.spark.ml.h2o.models._
val model = H2OMOJOModel.createFromMojo("/Users/peter/prostate.mojo")

or in Python:

.. code:: python

from pysparkling.ml import *
model = H2OMOJOModel.create_from_mojo("/Users/peter/prostate.mojo")


In the case Spark is running on Hadoop and HDFS is available, then call, in Scala:

.. code:: scala

import org.apache.spark.ml.h2o.models._
val model = H2OMOJOModel.createFromMojo("prostate.mojo")

or in Python:

.. code:: python

from pysparkling.ml import *
model = H2OMOJOModel.create_from_mojo("prostate.mojo")


attempts to load the mojo from the HDFS home directory of the current user.
You can also specify the absolute path in this case as, in Scala:

.. code:: scala

import org.apache.spark.ml.h2o.models._
val model = H2OMOJOModel.createFromMojo("/user/peter/prostate.mojo")

or in Python:

.. code:: python

from pysparkling.ml import *
model = H2OMOJOModel.create_from_mojo("/user/peter/prostate.mojo")


Both calls load the mojo file from the following location ``hdfs://{server}:{port}/user/peter/prostate.mojo``, where ``{server}`` and ``{port}`` is automatically filled in by Spark.


You can also manually specify the type of data source you need to use, in that case, you need to provide the schema, in Scala:

.. code:: scala

import org.apache.spark.ml.h2o.models._
// HDFS
val modelHDFS = H2OMOJOModel.createFromMojo("hdfs:///user/peter/prostate.mojo")
// Local file
val modelLocal = H2OMOJOModel.createFromMojo("file:///Users/peter/prostate.mojo")

or in Python:

.. code:: python

from pysparkling.ml import *
# HDFS
val model_hdfs = H2OMOJOModel.create_from_mojo("hdfs:///user/peter/prostate.mojo")
# Local file
val model_local = H2OMOJOModel.create_from_mojo("file:///Users/peter/prostate.mojo")
2 changes: 1 addition & 1 deletion doc/src/site/sphinx/tutorials/grid_gbm_pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ We can also set regular arguments using the ``setParameters`` call. In this case

val grid = new H2OGridSearch().
setPredictionsCol("label").
setHyperParameters(hyperParams.asJava).
setHyperParameters(hyperParams).
setParameters(new H2OGBM().setMaxDepth(30))

Remove Temporary Columns
Expand Down
18 changes: 12 additions & 6 deletions examples/pipelines/hamOrSpamMultiAlgo.script.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ val algoStage = algo match {
setL2(0.0).
setSeed(1).
setHidden(Array[Int](200, 200)).
setFeaturesCols(idf.getOutputCol).
setFeaturesCols("tf_idf").
setPredictionsCol("label")
case "automl" =>
// Create H2OAutoML model
Expand All @@ -101,13 +101,12 @@ val algoStage = algo match {
setConvertUnknownCategoricalLevelsToNa(true)
case "grid_gbm" =>
// Create Grid GBM Model
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
val hyperParams: HashMap[String, Array[AnyRef]] = HashMap()
hyperParams += ("_ntrees" -> Array(1, 30).map(_.asInstanceOf[AnyRef]))
new H2OGridSearch().
setPredictionsCol("label").
setHyperParameters(hyperParams.asJava).
setHyperParameters(hyperParams).
setParameters(new H2OGBM().setMaxDepth(30).setSeed(1))
}

Expand All @@ -120,10 +119,17 @@ val colPruner = new ColumnPruner().
val pipeline = new Pipeline().
setStages(Array(tokenizer, stopWordsRemover, hashingTF, idf, algoStage, colPruner))

// Test exporting and importing the pipeline. On Systems where HDFS & Hadoop is not available, this call store the pipeline
// to local file in the current directory. In case HDFS & Hadoop is available, this call stores the pipeline to HDFS home
// directory for the current user. Absolute paths can be used as wells. The same holds for the model import/export bellow.
pipeline.write.overwrite.save("examples/build/pipeline")
val loadedPipeline = Pipeline.load("examples/build/pipeline")
// Train the pipeline model
val data = load("smsData.txt")
val model = pipeline.fit(data)
val model = loadedPipeline.fit(data)

model.write.overwrite.save("examples/build/model")
val loadedModel = PipelineModel.load("examples/build/model")

/*
* Make predictions on unlabeled data
Expand All @@ -139,6 +145,6 @@ def isSpam(smsText: String,
prediction.select("prediction_output.p1").first.getDouble(0) > hamThreshold
}

println(isSpam("Michal, h2oworld party tonight in MV?", model))
println(isSpam("Michal, h2oworld party tonight in MV?", loadedModel))

println(isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?", model))
println(isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?", loadedModel))
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class ScriptStrataAirlines extends ScriptsTestHelper {
@RunWith(classOf[JUnitRunner])
class ScriptPipelineHamOrSpamGBM extends ScriptsTestHelper {
override protected def beforeAll(): Unit = {
sparkConf = defaultConf.setMaster("local-cluster[3,2,4096]")
sparkConf = defaultConf.setMaster("local")
.set("spark.driver.memory", "4G")
.set("spark.executor.memory", "4G")
super.beforeAll()
Expand Down Expand Up @@ -249,9 +249,9 @@ object HamOrSpamTester {

def test(scriptsTestHelper: ScriptsTestHelper, fileName: String, algo: String) {
val inspections = new ScriptInspections()
inspections.addSnippet("val answer1 = isSpam(\"Michal, h2oworld party tonight in MV?\", model)")
inspections.addSnippet("val answer1 = isSpam(\"Michal, h2oworld party tonight in MV?\", loadedModel)")
inspections.addTermToCheck("answer1")
inspections.addSnippet("val answer2 = isSpam(\"We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?\", model)")
inspections.addSnippet("val answer2 = isSpam(\"We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?\", loadedModel)")
inspections.addTermToCheck("answer2")

val result = scriptsTestHelper.launchScript(fileName, inspections, "pipelines", "val algo=\"" + algo + "\"")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package org.apache.spark.ml.h2o.algos

import java.io._

import hex.Model
import hex.genmodel.utils.DistributionFamily
import hex.{FrameSplitter, Model}
import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.h2o._
Expand All @@ -30,7 +30,6 @@ import org.apache.spark.ml.{Estimator, Model => SparkModel}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, SQLContext}
import water.Key
import water.fvec.{Frame, H2OFrame}
import water.support.H2OFrameSupport

import scala.reflect.ClassTag
Expand Down Expand Up @@ -81,7 +80,7 @@ abstract class H2OAlgorithm[P <: Model.Parameters : ClassTag, M <: SparkModel[M]
|| getParams._distribution == DistributionFamily.multinomial)
&& !trainFrame.vec(getPredictionsCol()).isCategorical) {
trainFrame.replace(trainFrame.find(getPredictionsCol()),
trainFrame.vec(getPredictionsCol()).toCategoricalVec).remove()
trainFrame.vec(getPredictionsCol()).toCategoricalVec).remove()
}
water.DKV.put(trainFrame)

Expand All @@ -100,17 +99,17 @@ abstract class H2OAlgorithm[P <: Model.Parameters : ClassTag, M <: SparkModel[M]
@DeveloperApi
override def transformSchema(schema: StructType): StructType = {
require(schema.fields.exists(f => f.name.compareToIgnoreCase(getPredictionsCol()) == 0),
s"Specified prediction columns '${getPredictionsCol()} was not found in input dataset!")
s"Specified prediction columns '${getPredictionsCol()} was not found in input dataset!")
require(!getFeaturesCols().exists(n => n.compareToIgnoreCase(getPredictionsCol()) == 0),
s"Specified input features cannot contain prediction column!")
s"Specified input features cannot contain prediction column!")
schema
}

override def copy(extra: ParamMap): this.type = defaultCopy(extra)

@Since("1.6.0")
override def write: MLWriter = new H2OAlgorithmWriter(this)

def defaultFileName: String
}

Expand All @@ -120,16 +119,14 @@ private[algos] class H2OAlgorithmWriter[T <: H2OAlgorithm[_, _]](instance: T) ex
@Since("1.6.0") override protected def saveImpl(path: String): Unit = {
val hadoopConf = sc.hadoopConfiguration
DefaultParamsWriter.saveMetadata(instance, path, sc)
val outputPath = if (path.startsWith("file://")) {
new Path(path, instance.defaultFileName)
} else {
new Path("file://" + path, instance.defaultFileName)
}
val outputPath = new Path(path, instance.defaultFileName)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
fs.create(qualifiedOutputPath)
val oos = new ObjectOutputStream(new FileOutputStream(new File(qualifiedOutputPath.toUri), false))
val out = fs.create(qualifiedOutputPath)
val oos = new ObjectOutputStream(out)
oos.writeObject(instance.getParams)
out.close()
logInfo(s"Saved to: $qualifiedOutputPath")
}
}

Expand All @@ -140,8 +137,12 @@ private[algos] class H2OAlgorithmReader[A <: H2OAlgorithm[P, _] : ClassTag, P <:

override def load(path: String): A = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val file = new File(path, defaultFileName)
val ois = new ObjectInputStream(new FileInputStream(file))

val inputPath = new Path(path, defaultFileName)
val fs = inputPath.getFileSystem(sc.hadoopConfiguration)
val qualifiedInputPath = inputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val ois = new ObjectInputStream(fs.open(qualifiedInputPath))

val parameters = ois.readObject().asInstanceOf[P]
implicit val h2oContext = H2OContext.ensure("H2OContext has to be started in order to use H2O pipelines elements.")
val h2oAlgo = make[A, P](parameters, metadata.uid, h2oContext, sqlContext)
Expand Down
43 changes: 24 additions & 19 deletions ml/src/main/scala/org/apache/spark/ml/h2o/algos/H2OAutoML.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ import java.io._
import java.util.Date

import ai.h2o.automl.{AutoML, AutoMLBuildSpec}
import hex.{FrameSplitter, ScoreKeeper}
import hex.ScoreKeeper
import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.h2o._
import org.apache.spark.ml.Estimator
import org.apache.spark.ml.h2o.models.H2OMOJOModel
import org.apache.spark.ml.h2o.param.NullableStringParam
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.ml.{Estimator, Model => SparkModel}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, SQLContext}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.json4s.{JNull, JString, JValue}
import water.Key
import water.fvec.{Frame, H2OFrame}
import water.support.{H2OFrameSupport, ModelSerializationSupport}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -130,16 +130,14 @@ private[algos] class H2OAutoMLWriter(instance: H2OAutoML) extends MLWriter {
@Since("1.6.0") override protected def saveImpl(path: String): Unit = {
val hadoopConf = sc.hadoopConfiguration
DefaultParamsWriter.saveMetadata(instance, path, sc)
val outputPath = if (path.startsWith("file://")) {
new Path(path, instance.defaultFileName)
} else {
new Path("file://" + path, instance.defaultFileName)
}
val outputPath = new Path(path, instance.defaultFileName)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
fs.create(qualifiedOutputPath)
val oos = new ObjectOutputStream(new FileOutputStream(new File(qualifiedOutputPath.toUri), false))
oos.writeObject(instance.automlBuildSpec.get)
val out = fs.create(qualifiedOutputPath)
val oos = new ObjectOutputStream(out)
oos.writeObject(instance.automlBuildSpec.orNull)
out.close()
logInfo(s"Saved to: $qualifiedOutputPath")
}
}

Expand All @@ -149,11 +147,17 @@ private[algos] class H2OAutoMLReader(val defaultFileName: String) extends MLRead

override def load(path: String): H2OAutoML = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val file = new File(path, defaultFileName)
val ois = new ObjectInputStream(new FileInputStream(file))

val inputPath = new Path(path, defaultFileName)
val fs = inputPath.getFileSystem(sc.hadoopConfiguration)
val qualifiedInputPath = inputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val ois = new ObjectInputStream(fs.open(qualifiedInputPath))

val buildSpec = ois.readObject().asInstanceOf[AutoMLBuildSpec]
implicit val h2oContext: H2OContext = H2OContext.ensure("H2OContext has to be started in order to use H2O pipelines elements.")
new H2OAutoML(Some(buildSpec), metadata.uid)(h2oContext, sqlContext)
val algo = new H2OAutoML(Option(buildSpec), metadata.uid)(h2oContext, sqlContext)
DefaultParamsReader.getAndSetParams(algo, metadata)
algo
}
}

Expand All @@ -162,17 +166,17 @@ trait H2OAutoMLParams extends Params {
//
// Param definitions
//
private final val predictionCol = new Param[String](this, "predictionCol", "Prediction column name")
private final val predictionCol = new NullableStringParam(this, "predictionCol", "Prediction column name")
private final val allStringColumnsToCategorical = new BooleanParam(this, "allStringColumnsToCategorical", "Transform all strings columns to categorical")
private final val ratio = new DoubleParam(this, "ratio", "Determines in which ratios split the dataset")
private final val foldColumn = new Param[String](this, "foldColumn", "Fold column name")
private final val weightsColumn = new Param[String](this, "weightsColumn", "Weights column name")
private final val foldColumn = new NullableStringParam(this, "foldColumn", "Fold column name")
private final val weightsColumn = new NullableStringParam(this, "weightsColumn", "Weights column name")
private final val ignoredColumns = new StringArrayParam(this, "ignoredColumns", "Ignored columns names")
private final val tryMutations = new BooleanParam(this, "tryMutations", "Whether to use mutations as part of the feature engineering")
private final val excludeAlgos = new H2OAutoMLAlgosParam(this, "excludeAlgos", "Algorithms to exclude when using automl")
private final val projectName = new Param[String](this, "projectName", "Identifier for models that should be grouped together in the leaderboard" +
private final val projectName = new NullableStringParam(this, "projectName", "Identifier for models that should be grouped together in the leaderboard" +
" (e.g., airlines and iris)")
private final val loss = new Param[String](this, "loss", "loss")
private final val loss = new NullableStringParam(this, "loss", "loss")
private final val maxRuntimeSecs = new DoubleParam(this, "maxRuntimeSecs", "Maximum time in seconds for automl to be running")
private final val stoppingRounds = new IntParam(this, "stoppingRounds", "Stopping rounds")
private final val stoppingTolerance = new DoubleParam(this, "stoppingTolerance", "Stopping tolerance")
Expand Down Expand Up @@ -379,3 +383,4 @@ class H2OAutoMLStoppingMetricParam private(parent: Params, name: String, doc: St

}
}

Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,5 @@ trait H2OGBMParams extends H2OSharedTreeParams[GBMParameters] {
parameters._col_sample_rate = $(colSampleRate)
parameters._max_abs_leafnode_pred = $(maxAbsLeafnodePred)
parameters._pred_noise_bandwidth = $(predNoiseBandwidth)

}
}