Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => O
import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.storage.StorageLevel


/**
Expand Down Expand Up @@ -110,13 +111,19 @@ class DecisionTreeClassifier @Since("1.4.0") (
val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset, numClasses)
val strategy = getOldStrategy(categoricalFeatures, numClasses)

val instr = Instrumentation.create(this, oldDataset)
val instr = Instrumentation.create(this, dataset)
instr.logParams(params: _*)

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK)

val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all",
seed = $(seed), instr = Some(instr), parentUID = Some(uid))

val m = trees.head.asInstanceOf[DecisionTreeClassificationModel]

if (handlePersistence) oldDataset.unpersist()

instr.logSuccess(m)
m
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ import org.apache.spark.ml.tree.impl.GradientBoostedTrees
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.loss.LogLoss
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel


/**
* Gradient-Boosted Trees (GBTs) (http://en.wikipedia.org/wiki/Gradient_boosting)
Expand Down Expand Up @@ -165,16 +166,22 @@ class GBTClassifier @Since("1.4.0") (
s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}")
}

val instr = Instrumentation.create(this, oldDataset)
val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval)
instr.logNumFeatures(numFeatures)
instr.logNumClasses(numClasses)

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK)

val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy,
$(seed))
val m = new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures)

if (handlePersistence) oldDataset.unpersist()

instr.logSuccess(m)
m
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.storage.StorageLevel


/** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
Expand Down Expand Up @@ -170,7 +172,10 @@ class LinearSVC @Since("2.2.0") (
Instance(label, weight, features)
}

val instr = Instrumentation.create(this, instances)
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

val instr = Instrumentation.create(this, dataset)
instr.logParams(regParam, maxIter, fitIntercept, tol, standardization, threshold,
aggregationDepth)

Expand Down Expand Up @@ -264,6 +269,9 @@ class LinearSVC @Since("2.2.0") (
}

val model = copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))

if (handlePersistence) instances.unpersist()

instr.logSuccess(model)
model
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestMo
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel


/**
Expand Down Expand Up @@ -130,17 +131,23 @@ class RandomForestClassifier @Since("1.4.0") (
val strategy =
super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity)

val instr = Instrumentation.create(this, oldDataset)
val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol,
impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, checkpointInterval)

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK)

val trees = RandomForest
.run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr))
.map(_.asInstanceOf[DecisionTreeClassificationModel])

val numFeatures = oldDataset.first().features.size
val m = new RandomForestClassificationModel(uid, trees, numFeatures, numClasses)

if (handlePersistence) oldDataset.unpersist()

instr.logSuccess(m)
m
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.storage.StorageLevel


/**
Expand Down Expand Up @@ -342,15 +343,18 @@ class GaussianMixture @Since("2.0.0") (

val instances: RDD[Vector] = dataset.select(col($(featuresCol))).rdd.map {
case Row(features: Vector) => features
}.cache()
}

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

// Extract the number of features.
val numFeatures = instances.first().size
require(numFeatures < GaussianMixture.MAX_NUM_FEATURES, s"GaussianMixture cannot handle more " +
s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" +
s" matrix is quadratic in the number of features.")

val instr = Instrumentation.create(this, instances)
val instr = Instrumentation.create(this, dataset)
instr.logParams(featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol)
instr.logNumFeatures(numFeatures)

Expand Down Expand Up @@ -420,6 +424,9 @@ class GaussianMixture @Since("2.0.0") (
}

val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this)

if (handlePersistence) instances.unpersist()

val summary = new GaussianMixtureSummary(model.transform(dataset),
$(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood)
model.setSummary(Some(summary))
Expand Down
7 changes: 7 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.PeriodicCheckpointer
import org.apache.spark.util.VersionUtils

Expand Down Expand Up @@ -909,6 +910,10 @@ class LDA @Since("1.6.0") (
.setOptimizer(getOldOptimizer)
// TODO: persist here, or in old LDA?
val oldData = LDA.getOldDataset(dataset, $(featuresCol))

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) oldData.persist(StorageLevel.MEMORY_AND_DISK)

val oldModel = oldLDA.run(oldData)
val newModel = oldModel match {
case m: OldLocalLDAModel =>
Expand All @@ -917,6 +922,8 @@ class LDA @Since("1.6.0") (
new DistributedLDAModel(uid, m.vocabSize, m, dataset.sparkSession, None)
}

if (handlePersistence) oldData.unpersist()

instr.logNumFeatures(newModel.vocabSize)
val model = copyValues(newModel).setParent(this)
instr.logSuccess(model)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeMo
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel


/**
Expand Down Expand Up @@ -105,13 +106,19 @@ class DecisionTreeRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset)
val strategy = getOldStrategy(categoricalFeatures)

val instr = Instrumentation.create(this, oldDataset)
val instr = Instrumentation.create(this, dataset)
instr.logParams(params: _*)

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK)

val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all",
seed = $(seed), instr = Some(instr), parentUID = Some(uid))

val m = trees.head.asInstanceOf[DecisionTreeRegressionModel]

if (handlePersistence) oldDataset.unpersist()

instr.logSuccess(m)
m
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTMod
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel


/**
* <a href="http://en.wikipedia.org/wiki/Gradient_boosting">Gradient-Boosted Trees (GBTs)</a>
Expand Down Expand Up @@ -147,15 +149,21 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
val numFeatures = oldDataset.first().features.size
val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression)

val instr = Instrumentation.create(this, oldDataset)
val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval)
instr.logNumFeatures(numFeatures)

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK)

val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy,
$(seed))
val m = new GBTRegressionModel(uid, baseLearners, learnerWeights, numFeatures)

if (handlePersistence) oldDataset.unpersist()

instr.logSuccess(m)
m
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}
import org.apache.spark.storage.StorageLevel

/**
* Params for Generalized Linear Regression.
Expand Down Expand Up @@ -421,10 +422,17 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam))
val optimizer = new IterativelyReweightedLeastSquares(initialModel,
familyAndLink.reweightFunc, $(fitIntercept), $(regParam), $(maxIter), $(tol))

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

val irlsModel = optimizer.fit(instances)
val model = copyValues(
new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept)
.setParent(this))

if (handlePersistence) instances.unpersist()

val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model,
irlsModel.diagInvAtWA.toArray, irlsModel.numIterations, getSolver)
model.setSummary(Some(trainingSummary))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestMo
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel


/**
Expand Down Expand Up @@ -121,17 +122,23 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
val strategy =
super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity)

val instr = Instrumentation.create(this, oldDataset)
val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol, impurity, numTrees,
featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
minInstancesPerNode, seed, subsamplingRate, cacheNodeIds, checkpointInterval)

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) oldDataset.persist(StorageLevel.MEMORY_AND_DISK)

val trees = RandomForest
.run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr))
.map(_.asInstanceOf[DecisionTreeRegressionModel])

val numFeatures = oldDataset.first().features.size
val m = new RandomForestRegressionModel(uid, trees, numFeatures)

if (handlePersistence) oldDataset.unpersist()

instr.logSuccess(m)
m
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ class BisectingKMeansSuite
override def beforeAll(): Unit = {
super.beforeAll()
dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k)
dataset.persist()
sparseDataset = KMeansSuite.generateSparseData(spark, 10, 1000, 42)
sparseDataset.persist()
}

test("default parameters") {
Expand Down