diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index 9f60f0896ec52..477ef2e85c2c2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -33,6 +33,7 @@ import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => O import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel} import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset +import org.apache.spark.storage.StorageLevel /** @@ -110,13 +111,19 @@ class DecisionTreeClassifier @Since("1.4.0") ( val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset, numClasses) val strategy = getOldStrategy(categoricalFeatures, numClasses) - val instr = Instrumentation.create(this, oldDataset) + val instr = Instrumentation.create(this, dataset) instr.logParams(params: _*) + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK) + val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all", seed = $(seed), instr = Some(instr), parentUID = Some(uid)) val m = trees.head.asInstanceOf[DecisionTreeClassificationModel] + + if (handlePersistence) oldDataset.unpersist() + instr.logSuccess(m) m } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala index ade0960f87a0d..25a4e74377028 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala @@ -32,11 +32,12 @@ import org.apache.spark.ml.tree.impl.GradientBoostedTrees import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} -import org.apache.spark.mllib.tree.loss.LogLoss import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + /** * Gradient-Boosted Trees (GBTs) (http://en.wikipedia.org/wiki/Gradient_boosting) @@ -165,16 +166,22 @@ class GBTClassifier @Since("1.4.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } - val instr = Instrumentation.create(this, oldDataset) + val instr = Instrumentation.create(this, dataset) instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType, maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode, seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval) instr.logNumFeatures(numFeatures) instr.logNumClasses(numClasses) + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK) + val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy, $(seed)) val m = new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures) + + if (handlePersistence) oldDataset.unpersist() + instr.logSuccess(m) m } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 8d556deef2be8..962b5340ecc1d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -38,6 +38,8 @@ import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.storage.StorageLevel + /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam @@ -170,7 +172,10 @@ class LinearSVC @Since("2.2.0") ( Instance(label, weight, features) } - val instr = Instrumentation.create(this, instances) + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + + val instr = Instrumentation.create(this, dataset) instr.logParams(regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth) @@ -264,6 +269,9 @@ class LinearSVC @Since("2.2.0") ( } val model = copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector)) + + if (handlePersistence) instances.unpersist() + instr.logSuccess(model) model } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala index ab4c235209289..e6cf72cf1b2a4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala @@ -33,6 +33,7 @@ import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestMo import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel /** @@ -130,17 +131,23 @@ class RandomForestClassifier @Since("1.4.0") ( val strategy = super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity) - val instr = Instrumentation.create(this, oldDataset) + val instr = Instrumentation.create(this, dataset) instr.logParams(labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol, impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain, minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, checkpointInterval) + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK) + val trees = RandomForest .run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr)) .map(_.asInstanceOf[DecisionTreeClassificationModel]) val numFeatures = oldDataset.first().features.size val m = new RandomForestClassificationModel(uid, trees, numFeatures, numClasses) + + if (handlePersistence) oldDataset.unpersist() + instr.logSuccess(m) m } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index 5259ee419445f..02c24f25edd52 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -35,6 +35,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.storage.StorageLevel /** @@ -342,7 +343,10 @@ class GaussianMixture @Since("2.0.0") ( val instances: RDD[Vector] = dataset.select(col($(featuresCol))).rdd.map { case Row(features: Vector) => features - }.cache() + } + + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) // Extract the number of features. val numFeatures = instances.first().size @@ -350,7 +354,7 @@ class GaussianMixture @Since("2.0.0") ( s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" + s" matrix is quadratic in the number of features.") - val instr = Instrumentation.create(this, instances) + val instr = Instrumentation.create(this, dataset) instr.logParams(featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol) instr.logNumFeatures(numFeatures) @@ -420,6 +424,9 @@ class GaussianMixture @Since("2.0.0") ( } val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this) + + if (handlePersistence) instances.unpersist() + val summary = new GaussianMixtureSummary(model.transform(dataset), $(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood) model.setSummary(Some(summary)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 3da29b1c816b1..eda301034b7e0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -44,6 +44,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf} import org.apache.spark.sql.types.StructType +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.PeriodicCheckpointer import org.apache.spark.util.VersionUtils @@ -909,6 +910,10 @@ class LDA @Since("1.6.0") ( .setOptimizer(getOldOptimizer) // TODO: persist here, or in old LDA? val oldData = LDA.getOldDataset(dataset, $(featuresCol)) + + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) oldData.persist(StorageLevel.MEMORY_AND_DISK) + val oldModel = oldLDA.run(oldData) val newModel = oldModel match { case m: OldLocalLDAModel => @@ -917,6 +922,8 @@ class LDA @Since("1.6.0") ( new DistributedLDAModel(uid, m.vocabSize, m, dataset.sparkSession, None) } + if (handlePersistence) oldData.unpersist() + instr.logNumFeatures(newModel.vocabSize) val model = copyValues(newModel).setParent(this) instr.logSuccess(model) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala index 01c5cc1c7efa9..949f5db6fff97 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala @@ -35,6 +35,7 @@ import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeMo import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel /** @@ -105,13 +106,19 @@ class DecisionTreeRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) val strategy = getOldStrategy(categoricalFeatures) - val instr = Instrumentation.create(this, oldDataset) + val instr = Instrumentation.create(this, dataset) instr.logParams(params: _*) + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK) + val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all", seed = $(seed), instr = Some(instr), parentUID = Some(uid)) val m = trees.head.asInstanceOf[DecisionTreeRegressionModel] + + if (handlePersistence) oldDataset.unpersist() + instr.logSuccess(m) m } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala index 08d175cb94442..9a727fb4f38e7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala @@ -36,6 +36,8 @@ import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTMod import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + /** * Gradient-Boosted Trees (GBTs) @@ -147,15 +149,21 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) val numFeatures = oldDataset.first().features.size val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression) - val instr = Instrumentation.create(this, oldDataset) + val instr = Instrumentation.create(this, dataset) instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType, maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode, seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval) instr.logNumFeatures(numFeatures) + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK) + val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy, $(seed)) val m = new GBTRegressionModel(uid, baseLearners, learnerWeights, numFeatures) + + if (handlePersistence) oldDataset.unpersist() + instr.logSuccess(m) m } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 917a4d238d467..1f0bbd5a3e626 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, DoubleType, StructType} +import org.apache.spark.storage.StorageLevel /** * Params for Generalized Linear Regression. @@ -421,10 +422,17 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam)) val optimizer = new IterativelyReweightedLeastSquares(initialModel, familyAndLink.reweightFunc, $(fitIntercept), $(regParam), $(maxIter), $(tol)) + + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + val irlsModel = optimizer.fit(instances) val model = copyValues( new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept) .setParent(this)) + + if (handlePersistence) instances.unpersist() + val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model, irlsModel.diagInvAtWA.toArray, irlsModel.numIterations, getSolver) model.setSummary(Some(trainingSummary)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index a58da50fad972..7f597f26fa22d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -34,6 +34,7 @@ import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestMo import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel /** @@ -121,17 +122,23 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S val strategy = super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity) - val instr = Instrumentation.create(this, oldDataset) + val instr = Instrumentation.create(this, dataset) instr.logParams(labelCol, featuresCol, predictionCol, impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain, minInstancesPerNode, seed, subsamplingRate, cacheNodeIds, checkpointInterval) + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK) + val trees = RandomForest .run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr)) .map(_.asInstanceOf[DecisionTreeRegressionModel]) val numFeatures = oldDataset.first().features.size val m = new RandomForestRegressionModel(uid, trees, numFeatures) + + if (handlePersistence) oldDataset.unpersist() + instr.logSuccess(m) m } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index fa7471fa2d658..1141e119bb0af 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -34,7 +34,9 @@ class BisectingKMeansSuite override def beforeAll(): Unit = { super.beforeAll() dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k) + dataset.persist() sparseDataset = KMeansSuite.generateSparseData(spark, 10, 1000, 42) + sparseDataset.persist() } test("default parameters") {