Skip to content

Commit

Permalink
[SW-1259][FollowUp] Unify ratio param across pipeline api (#1211)
Browse files Browse the repository at this point in the history
(cherry picked from commit d70a470)
  • Loading branch information
jakubhava committed May 22, 2019
1 parent 0847231 commit 56415c5
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 62 deletions.
2 changes: 1 addition & 1 deletion examples/pipelines/hamOrSpamMultiAlgo.script.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ val algoStage = algo match {
case "gbm" =>
// Create GBM model
new H2OGBM().
setTrainRatio(0.8).
setSplitRatio(0.8).
setSeed(1).
setFeaturesCols("tf_idf").
setLabelCol("label")
Expand Down
16 changes: 3 additions & 13 deletions ml/src/main/scala/org/apache/spark/ml/h2o/algos/H2OAlgorithm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,9 @@ abstract class H2OAlgorithm[P <: Model.Parameters : ClassTag] extends Estimator[
// Update H2O params based on provided configuration
updateH2OParams()

val input = prepareDatasetForFitting(dataset)

// check if we need to do any splitting
if (getTrainRatio() < 1.0) {
// need to do splitting
val keys = H2OFrameSupport.split(input, Seq(Key.rand(), Key.rand()), Seq(getTrainRatio()))
parameters._train = keys(0)._key
if (keys.length > 1) {
parameters._valid = keys(1)._key
}
} else {
parameters._train = input._key
}
val (train, valid) = prepareDatasetForFitting(dataset)
parameters._train = train._key
parameters._valid = valid.map(_._key).orNull

val trainFrame = parameters._train.get()
if (getAllStringColumnsToCategorical()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
*/
package org.apache.spark.ml.h2o.algos

import org.apache.spark.h2o.{H2OContext, H2OFrame}
import org.apache.spark.h2o.H2OContext
import org.apache.spark.ml.h2o.param.H2OCommonParams
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, SparkSession}
import water.Key
import water.fvec.Frame
import water.support.H2OFrameSupport

/**
* This trait contains methods that are shared across all algorithms.
*/
trait H2OAlgorithmCommons extends H2OCommonParams {
protected def prepareDatasetForFitting(dataset: Dataset[_]): H2OFrame = {
protected def prepareDatasetForFitting(dataset: Dataset[_]): (Frame, Option[Frame]) = {
val excludedCols = getExcludedCols()

// if this is left empty select
Expand All @@ -36,6 +39,18 @@ trait H2OAlgorithmCommons extends H2OCommonParams {

val cols = (getFeaturesCols() ++ excludedCols).map(col)
val h2oContext = H2OContext.getOrCreate(SparkSession.builder().getOrCreate())
h2oContext.asH2OFrame(dataset.select(cols: _*).toDF())
val input = h2oContext.asH2OFrame(dataset.select(cols: _*).toDF())

if (getSplitRatio() < 1.0) {
// need to do splitting
val keys = H2OFrameSupport.split(input, Seq(Key.rand(), Key.rand()), Seq(getSplitRatio()))
if (keys.length > 1) {
(keys(0), Some(keys(1)))
} else {
(keys(0), None)
}
} else {
(input, None)
}
}
}
20 changes: 6 additions & 14 deletions ml/src/main/scala/org/apache/spark/ml/h2o/algos/H2OAutoML.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,10 @@ class H2OAutoML(override val uid: String) extends Estimator[H2OMOJOModel]
// generate random name to generate fresh leaderboard (the default behaviour)
setProjectName(Random.alphanumeric.take(30).mkString)
}
val input = prepareDatasetForFitting(dataset)
// check if we need to do any splitting
if (getRatio() < 1.0) {
// need to do splitting
val keys = H2OFrameSupport.split(input, Seq(Key.rand(), Key.rand()), Seq(getRatio()))
spec.input_spec.training_frame = keys(0)._key
if (keys.length > 1) {
spec.input_spec.validation_frame = keys(1)._key
}
} else {
spec.input_spec.training_frame = input._key
}

val (train, valid) = prepareDatasetForFitting(dataset)
spec.input_spec.training_frame = train._key
spec.input_spec.validation_frame = valid.map(_._key).orNull

val trainFrame = spec.input_spec.training_frame.get()
if (getAllStringColumnsToCategorical()) {
Expand Down Expand Up @@ -143,15 +135,15 @@ trait H2OAutoMLParams extends H2OCommonParams with DeprecatableParams {
override protected def renamingMap: Map[String, String] = Map(
"predictionCol" -> "labelCol",
"foldColumn" -> "foldCol",
"ignoredColumns" -> "ignoredCols"
"ignoredColumns" -> "ignoredCols",
"ratio" -> "splitRatio"
)

//
// Param definitions
//
private val allStringColumnsToCategorical = new BooleanParam(this, "allStringColumnsToCategorical", "Transform all strings columns to categorical")
private val columnsToCategorical = new StringArrayParam(this, "columnsToCategorical", "List of columns to convert to categoricals before modelling")
private val ratio = new DoubleParam(this, "ratio", "Determines in which ratios split the dataset")
private val ignoredCols = new StringArrayParam(this, "ignoredCols", "Ignored column names")
private val includeAlgos = new H2OAutoMLAlgosParam(this, "includeAlgos", "Algorithms to include when using automl")
private val excludeAlgos = new H2OAutoMLAlgosParam(this, "excludeAlgos", "Algorithms to exclude when using automl")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.ml.util._
*
* TODO: There are still bunch of parameters defined DeepLearningParameters which need to be ported here
*/
class H2ODeepLearning(override val uid: String) extends H2OAlgorithm[DeepLearningParameters]
with H2ODeepLearningParams {
class H2ODeepLearning(override val uid: String) extends H2OAlgorithm[DeepLearningParameters] with H2ODeepLearningParams {

def this() = this(Identifiable.randomUID("deeplearning"))

Expand Down
3 changes: 1 addition & 2 deletions ml/src/main/scala/org/apache/spark/ml/h2o/algos/H2OGBM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable}
/**
* H2O GBM algorithm exposed via Spark ML pipelines.
*/
class H2OGBM(override val uid: String) extends H2OAlgorithm[GBMParameters]
with H2OGBMParams {
class H2OGBM(override val uid: String) extends H2OAlgorithm[GBMParameters] with H2OGBMParams {

def this() = this(Identifiable.randomUID("gbm"))

Expand Down
3 changes: 1 addition & 2 deletions ml/src/main/scala/org/apache/spark/ml/h2o/algos/H2OGLM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ import water.AutoBuffer
/**
* H2O GLM algorithm exposed via Spark ML pipelines.
*/
class H2OGLM(override val uid: String) extends H2OAlgorithm[GLMParameters]
with H2OGLMParams {
class H2OGLM(override val uid: String) extends H2OAlgorithm[GLMParameters] with H2OGLMParams {

def this() = this(Identifiable.randomUID("glm"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,10 @@ class H2OGridSearch(override val uid: String) extends Estimator[H2OMOJOModel]

val hyperParams = processHyperParams(algoParams, getHyperParameters())

val input = prepareDatasetForFitting(dataset)
// check if we need to do any splitting
if (getRatio() < 1.0) {
// need to do splitting
val keys = H2OFrameSupport.split(input, Seq(Key.rand(), Key.rand()), Seq(getRatio()))
algoParams._train = keys(0)._key
if (keys.length > 1) {
algoParams._valid = keys(1)._key
}
} else {
algoParams._train = input._key
}
val (train, valid) = prepareDatasetForFitting(dataset)
algoParams._train = train._key
algoParams._valid = valid.map(_._key).orNull

algoParams._nfolds = getNfolds()
algoParams._fold_column = getFoldCol()
algoParams._response_column = getLabelCol()
Expand Down Expand Up @@ -184,7 +176,7 @@ class H2OGridSearch(override val uid: String) extends Estimator[H2OMOJOModel]
if (getNfolds() > 1) {
// use cross validation metrics
model._output._cross_validation_metrics
} else if (getRatio() < 1) {
} else if (getSplitRatio() < 1) {
// some portion of data is reserved for validation, use validation metrics
model._output._validation_metrics
} else {
Expand Down Expand Up @@ -391,7 +383,8 @@ object H2OGridSearch extends DefaultParamsReadable[py_sparkling.ml.algos.H2OGrid
trait H2OGridSearchParams extends H2OCommonParams with DeprecatableParams {

override protected def renamingMap: Map[String, String] = Map(
"predictionCol" -> "labelCol"
"predictionCol" -> "labelCol",
"ratio" -> "splitRatio"
)

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.ml.util.{DefaultParamsReader, Identifiable}
/**
* H2O XGBoost algorithm exposed via Spark ML pipelines.
*/
class H2OXGBoost(override val uid: String) extends H2OAlgorithm[XGBoostParameters]
with H2OXGBoostParams {
class H2OXGBoost(override val uid: String) extends H2OAlgorithm[XGBoostParameters] with H2OXGBoostParams {

def this() = this(Identifiable.randomUID("xgboost"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trait H2OAlgoParams[P <: Parameters] extends H2OAlgoParamsHelper[P] with H2OComm

override protected def renamingMap: Map[String, String] = Map(
"predictionCol" -> "labelCol",
"splitRatio" -> "ratio"
"ratio" -> "splitRatio"
)

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import org.apache.spark.ml.param.{DoubleParam, Param, Params, StringArrayParam}
trait H2OCommonParams extends Params with Logging {

protected final val featuresCols = new StringArrayParam(this, "featuresCols", "Name of feature columns")
private val labelCol = new Param[String](this, "labelCol", "Label column name")
private val foldCol = new NullableStringParam(this, "foldCol", "Fold column name")
private val weightCol = new NullableStringParam(this, "weightCol", "Weight column name")
private val splitRatio = new DoubleParam(this, "splitRatio",
protected final val labelCol = new Param[String](this, "labelCol", "Label column name")
protected final val foldCol = new NullableStringParam(this, "foldCol", "Fold column name")
protected final val weightCol = new NullableStringParam(this, "weightCol", "Weight column name")
protected final val splitRatio = new DoubleParam(this, "splitRatio",
"Accepts values in range [0, 1.0] which determine how large part of dataset is used for training and for validation. " +
"For example, 0.8 -> 80% training 20% validation.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class H2OAlgoTest extends FunSuite with SharedH2OTestContext {
.csv(TestUtils.locate("smalldata/prostate/prostate.csv"))
// Create GBM model
val algo = new H2OGLM()
.setTrainRatio(0.8)
.setSplitRatio(0.8)
.setSeed(1)
.setFeaturesCols("CAPSULE", "RACE", "DPROS", "DCAPS", "PSA" , "VOL", "GLEASON")
.setLabelCol("AGE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ abstract class PipelinePredictionTestBase extends FunSuite with SparkTestContext

// Create GBM model
val gbm = new H2OGBM()
.setTrainRatio(0.8)
.setSplitRatio(0.8)
.setSeed(42)
.setFeaturesCols("tf_idf")
.setLabelCol("label")
Expand Down
2 changes: 1 addition & 1 deletion py/examples/pipelines/ham_or_spam_multi_algo.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def load():

if algo == "gbm":
## Create GBM model
algoStage = H2OGBM(ratio=0.8,
algoStage = H2OGBM(splitRatio=0.8,
seed=1,
featuresCols=[idf.getOutputCol()],
predictionCol="label")
Expand Down

0 comments on commit 56415c5

Please sign in to comment.