Skip to content

Commit

Permalink
[SW-2554] Expose Blending Frame on H2OAutoML
Browse files Browse the repository at this point in the history
Address Sebastian's comments

(cherry picked from commit 7e0eb21a85c6e7d1318738f6a594212e5f9ec16e)
  • Loading branch information
mn-mikke committed Apr 27, 2021
1 parent bbcbbc2 commit 1cc3a28
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 14 deletions.
5 changes: 1 addition & 4 deletions .gitignore
Expand Up @@ -84,9 +84,6 @@ templates/src/terraform/aws/modules/network/terraform.tfstate.backup
jenkins/docker/regular-tests/Dockerfile

# Generated code and documentation
ml/src-gen
scoring/src-gen
py/src-gen
r/src-gen
src-gen/
doc/src/site/sphinx/parameters
doc/src/site/sphinx/configuration/configuration_properties.rst
Expand Up @@ -18,6 +18,7 @@ trait AutoMLConfiguration extends AlgorithmConfigurations {

val leaderboardFrame =
ExplicitField("leadearboard_frame", "HasLeaderboardDataFrame", null, Some("leaderboardDataFrame"), None)
val blendingFrame = ExplicitField("blending_frame", "HasBlendingDataFrame", null, Some("blendingDataFrame"), None)

for ((entityName, h2oSchemaClass: Class[_], h2oParameterClass: Class[_], source) <- autoMLParameters)
yield ParameterSubstitutionContext(
Expand All @@ -26,7 +27,8 @@ trait AutoMLConfiguration extends AlgorithmConfigurations {
h2oSchemaClass,
h2oParameterClass,
AutoMLIgnoredParameters.all,
explicitFields = if (entityName == "H2OAutoMLInputParams") Seq(ignoredCols, leaderboardFrame) else Seq.empty,
explicitFields =
if (entityName == "H2OAutoMLInputParams") Seq(ignoredCols, leaderboardFrame, blendingFrame) else Seq.empty,
deprecatedFields = Seq.empty,
explicitDefaultValues =
Map("include_algos" -> ai.h2o.automl.Algo.values().map(_.name()), "response_column" -> "label"),
Expand Down
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.h2o.sparkling.ml.params

import ai.h2o.sparkling.H2OFrame
import org.apache.spark.expose.Logging
import org.apache.spark.sql.DataFrame

trait HasBlendingDataFrame extends H2OAlgoParamsBase with Logging {
val uid: String

private val blendingDataFrame = new NullableBigDataFrameParam(
this,
"blendingDataFrame",
"""This parameter is used for computing the predictions that serve as the training frame for the meta-learner.
|If provided, this triggers blending mode on the stacked ensemble training stage. Blending mode is faster
|than cross-validating the base learners (though these ensembles may not perform as well as the Super Learner
|ensemble).""".stripMargin)

setDefault(blendingDataFrame -> null)

def getBlendingDataFrame(): DataFrame = $(blendingDataFrame)

def setBlendingDataFrame(value: DataFrame): this.type = set(blendingDataFrame, value)

override private[ml] def getParameterDeserializationOverrides(): Map[String, Any => Any] = {
super.getParameterDeserializationOverrides() + ("blendingDataFrame",
(input: Any) => {
if (input != null) {
logWarning(
s"A pipeline stage with uid '$uid' contained the 'blendingDataFrame' property " +
"with a non-null value. The property was reset to null during the pipeline deserialization.")
}
null
})
}

private[sparkling] def getBlendingDataFrameParam(trainingFrame: H2OFrame): Map[String, Any] = {
Map("blending_frame" -> convertDataFrameToH2OFrameKey(getBlendingDataFrame()))
}

override private[sparkling] def getSWtoH2OParamNameMap(): Map[String, String] = {
super.getSWtoH2OParamNameMap() ++ Map("blendingDataFrame" -> "blending_frame")
}
}
36 changes: 36 additions & 0 deletions py-scoring/src/ai/h2o/sparkling/ml/params/HasBlendingDataFrame.py
@@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from ai.h2o.sparkling.ml.params.H2OTypeConverters import H2OTypeConverters
from pyspark.ml.param import *


class HasBlendingDataFrame(Params):
blendingDataFrame = Param(
Params._dummy(),
"blendingDataFrame",
"This parameter is used for computing the predictions that serve as the training frame for the meta-learner. "
"If provided, this triggers blending mode on the stacked ensemble training stage. Blending mode is faster "
"than cross-validating the base learners (though these ensembles may not perform as well as the Super Learner "
"ensemble).",
H2OTypeConverters.toNullableDataFrame())

def getBlendingDataFrame(self):
return self.getOrDefault(self.blendingDataFrame)

def setBlendingDataFrame(self, value):
return self._set(blendingDataFrame=value)
50 changes: 41 additions & 9 deletions py/tests/unit/with_runtime_sparkling/test_automl.py
Expand Up @@ -24,7 +24,7 @@
from pyspark.ml import Pipeline, PipelineModel
from pyspark.mllib.linalg import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, substring
from pyspark.sql.functions import col, substring, length

from pysparkling.ml.algos import H2OAutoML
from pysparkling.ml.algos.classification import H2OAutoMLClassifier
Expand Down Expand Up @@ -114,23 +114,24 @@ def testH2OAutoMLClassifierBehavesDiffenrentlyThanH2OAutoMLRegressor(prostateDat
unit_test_utils.assert_data_frames_have_different_values(regressionDataset, classificationDataset)


def prepareLeaderboardForComparison(df):
return df.withColumn("model_id", substring(col("model_id"), 1, 20)).drop("")


def testLeaderboardDataFrameHasImpactOnAutoMLLeaderboard(classificationDataset):
[trainingDateset, testingDataset] = classificationDataset.randomSplit([0.9, 0.1], 42)

def truncateModelId(df):
return df.withColumn("model_id", substring(col("model_id"), 1, 10))

automl = setParametersForTesting(H2OAutoML())
automl.fit(trainingDateset)
defaultLeaderboard1 = truncateModelId(automl.getLeaderboard())
defaultLeaderboard1 = prepareLeaderboardForComparison(automl.getLeaderboard())

automl = setParametersForTesting(H2OAutoML())
automl.fit(trainingDateset)
defaultLeaderboard2 = truncateModelId(automl.getLeaderboard())
defaultLeaderboard2 = prepareLeaderboardForComparison(automl.getLeaderboard())

automl = setParametersForTesting(H2OAutoML()).setLeaderboardDataFrame(testingDataset)
automl.fit(trainingDateset)
leaderboardWithCustomDataFrameSet = truncateModelId(automl.getLeaderboard())
leaderboardWithCustomDataFrameSet = prepareLeaderboardForComparison(automl.getLeaderboard())

unit_test_utils.assert_data_frames_are_identical(defaultLeaderboard1, defaultLeaderboard2)
unit_test_utils.assert_data_frames_have_different_values(defaultLeaderboard1, leaderboardWithCustomDataFrameSet)
Expand All @@ -142,7 +143,38 @@ def testDeserializationOfUnfittedPipelineWithAutoML(classificationDataset):
algo = setParametersForTesting(H2OAutoML()).setLeaderboardDataFrame(testingDataset)

pipeline = Pipeline(stages=[algo])
pipeline.write().overwrite().save("file://" + os.path.abspath("build/automl_pipeline"))
loadedPipeline = Pipeline.load("file://" + os.path.abspath("build/automl_pipeline"))
pipeline.write().overwrite().save("file://" + os.path.abspath("build/automl_pipeline_leaderboardDF"))
loadedPipeline = Pipeline.load("file://" + os.path.abspath("build/automl_pipeline_leaderboardDF"))
loadedPipeline.fit(trainingDateset)


def testBlendingDataFrameHasImpactOnAutoMLStackedEnsambleModels(classificationDataset):
[trainingDateset, blendingDataset] = classificationDataset.randomSplit([0.9, 0.1], 42)

def separateEnsembleModels(df):
stackedEnsembleDF = df.filter(df.model_id.startswith('StackedEnsemble'))
othersDF = df.subtract(stackedEnsembleDF)
return (stackedEnsembleDF, othersDF)

automl = setParametersForTesting(H2OAutoML())
automl.fit(trainingDateset)
defaultLeaderboard = separateEnsembleModels(prepareLeaderboardForComparison(automl.getLeaderboard()))

automl = setParametersForTesting(H2OAutoML()).setBlendingDataFrame(blendingDataset)
automl.fit(trainingDateset)
leaderboardWithBlendingFrameSet = separateEnsembleModels(prepareLeaderboardForComparison(automl.getLeaderboard()))

assert defaultLeaderboard[0].count() == 2
unit_test_utils.assert_data_frames_have_different_values(defaultLeaderboard[0], leaderboardWithBlendingFrameSet[0])
unit_test_utils.assert_data_frames_are_identical(defaultLeaderboard[1], leaderboardWithBlendingFrameSet[1])


def testDeserializationOfUnfittedPipelineWithAutoML(classificationDataset):
[trainingDateset, blendingDataset] = classificationDataset.randomSplit([0.9, 0.1], 42)

algo = setParametersForTesting(H2OAutoML()).setBlendingDataFrame(blendingDataset)

pipeline = Pipeline(stages=[algo])
pipeline.write().overwrite().save("file://" + os.path.abspath("build/automl_pipeline_blendingDF"))
loadedPipeline = Pipeline.load("file://" + os.path.abspath("build/automl_pipeline_blendingDF"))
loadedPipeline.fit(trainingDateset)

0 comments on commit 1cc3a28

Please sign in to comment.