Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-2549] Fix Memory Leak of Frames in H2OAutoML #2499

Merged
merged 2 commits into from Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -53,12 +53,6 @@ trait H2OAlgoCommonUtils extends EstimatorCommonUtils with H2OFrameLifecycle {
private[sparkling] def getValidationDataFrame(): DataFrame

private[sparkling] def prepareDatasetForFitting(dataset: Dataset[_]): (H2OFrame, Option[H2OFrame]) = {
prepareDatasetForFitting(dataset, registerFramesForDeletion = true)
}

private[sparkling] def prepareDatasetForFitting(
dataset: Dataset[_],
registerFramesForDeletion: Boolean): (H2OFrame, Option[H2OFrame]) = {
val excludedCols = getExcludedCols()

if (getFeaturesCols().isEmpty) {
Expand Down Expand Up @@ -107,10 +101,10 @@ trait H2OAlgoCommonUtils extends EstimatorCommonUtils with H2OFrameLifecycle {
} else {
(trainFrame, None)
}
if (registerFramesForDeletion) {
registerH2OFrameForDeletion(resultTrainFrame)
registerH2OFrameForDeletion(resultTestFrame)
}

registerH2OFrameForDeletion(resultTrainFrame)
registerH2OFrameForDeletion(resultTestFrame)

(resultTrainFrame, resultTestFrame)
}

Expand Down
10 changes: 7 additions & 3 deletions ml/src/main/scala/ai/h2o/sparkling/ml/algos/H2OAutoML.scala
Expand Up @@ -80,7 +80,7 @@ class H2OAutoML(override val uid: String)

override def fit(dataset: Dataset[_]): H2OMOJOModel = {
amlKeyOption = None
val (train, valid) = prepareDatasetForFitting(dataset, registerFramesForDeletion = false)
val (train, valid) = prepareDatasetForFitting(dataset)
val inputSpec = getInputSpec(train, valid)
val buildModels = getBuildModels()
val buildControl = getBuildControl()
Expand Down Expand Up @@ -139,11 +139,15 @@ class H2OAutoML(override val uid: String)
val colsData = table.getAsJsonArray("data").iterator().asScala.toArray.map(_.getAsJsonArray)
val numRows = table.get("rowcount").getAsInt
val rows = (0 until numRows).map { idx =>
Row(colsData.map(_.get(idx).getAsString): _*)
val rowData = colsData.map { colData =>
val element = colData.get(idx)
if (element.isJsonNull) null else element.getAsString
}
Row(rowData: _*)
}
val spark = SparkSessionUtils.active
val rdd = spark.sparkContext.parallelize(rows)
val schema = StructType(colNames.map(name => StructField(name, StringType)))
val schema = StructType(colNames.map(name => StructField(name, StringType, nullable = true)))
spark.createDataFrame(rdd, schema)
}

Expand Down
Expand Up @@ -34,10 +34,4 @@ private[sparkling] trait H2OClassifier extends H2OAlgoCommonUtils {
override private[sparkling] def prepareDatasetForFitting(dataset: Dataset[_]): (H2OFrame, Option[H2OFrame]) = {
super.prepareDatasetForFitting(prepareDatasetForClassification(dataset))
}

override private[sparkling] def prepareDatasetForFitting(
dataset: Dataset[_],
registerFramesForDeletion: Boolean): (H2OFrame, Option[H2OFrame]) = {
super.prepareDatasetForFitting(prepareDatasetForClassification(dataset), registerFramesForDeletion)
}
}
Expand Up @@ -34,10 +34,4 @@ private[sparkling] trait H2ORegressor extends H2OAlgoCommonUtils {
override private[sparkling] def prepareDatasetForFitting(dataset: Dataset[_]): (H2OFrame, Option[H2OFrame]) = {
super.prepareDatasetForFitting(prepareDatasetForRegression(dataset))
}

override private[sparkling] def prepareDatasetForFitting(
dataset: Dataset[_],
registerFramesForDeletion: Boolean): (H2OFrame, Option[H2OFrame]) = {
super.prepareDatasetForFitting(prepareDatasetForRegression(dataset), registerFramesForDeletion)
}
}
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.SparkSession
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FunSuite, Matchers}
import org.apache.spark.sql.functions.col

@RunWith(classOf[JUnitRunner])
class H2OAutoMLTestSuite extends FunSuite with Matchers with SharedH2OTestContext {
Expand Down Expand Up @@ -59,8 +60,15 @@ class H2OAutoMLTestSuite extends FunSuite with Matchers with SharedH2OTestContex
algo.fit(dataset)

val extraColumns = Seq("training_time_ms", "predict_time_per_row_ms")

algo.getLeaderboard(extraColumns: _*).columns shouldEqual algo.getLeaderboard().columns ++ extraColumns
val nullColumns = Seq("predict_time_per_row_ms")
val leaderboardWithExtraColumns = algo.getLeaderboard(extraColumns: _*)
val nonNullColumns = leaderboardWithExtraColumns.columns.diff(nullColumns)
val nullValues = leaderboardWithExtraColumns.select(nullColumns.map(col): _*).first().toSeq
val nonNullValues = leaderboardWithExtraColumns.select(nonNullColumns.map(col): _*).first().toSeq

nullValues shouldEqual Seq(null) // TODO: This needs to be fixed in H2O-3 AutoML backend
nonNullValues shouldNot contain(null)
leaderboardWithExtraColumns.columns shouldEqual algo.getLeaderboard().columns ++ extraColumns
}

test("ALL as getLeaderboard adds extra columns to the leaderboard") {
Expand Down