Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18946][ML] sliceAggregate which is a new aggregate operator for high-dimensional data #17000

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.ml.linalg._
import org.apache.spark.ml.linalg.BLAS._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.rdd.RDDFunctions._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.VectorImplicits._
Expand All @@ -49,7 +50,7 @@ import org.apache.spark.util.VersionUtils
*/
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth {
with HasStandardization with HasWeightCol with HasThreshold {

import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames

Expand Down Expand Up @@ -298,18 +299,6 @@ class LogisticRegression @Since("1.2.0") (
@Since("1.5.0")
override def getThresholds: Array[Double] = super.getThresholds

/**
* Suggested depth for treeAggregate (greater than or equal to 2).
* If the dimensions of features or the number of partitions are large,
* this param could be adjusted to a larger size.
* Default is 2.
*
* @group expertSetParam
*/
@Since("2.1.0")
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)

private var optInitialModel: Option[LogisticRegressionModel] = None

private[spark] def setInitialModel(model: LogisticRegressionModel): this.type = {
Expand Down Expand Up @@ -338,18 +327,20 @@ class LogisticRegression @Since("1.2.0") (
instr.logParams(regParam, elasticNetParam, standardization, threshold,
maxIter, tol, fitIntercept)

val (summarizer, labelSummarizer) = {
val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer),
instance: Instance) =>
(c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight))

val combOp = (c1: (MultivariateOnlineSummarizer, MultiClassSummarizer),
c2: (MultivariateOnlineSummarizer, MultiClassSummarizer)) =>
(c1._1.merge(c2._1), c1._2.merge(c2._2))
val summarizer = {
val seqOp = (c: MultivariateOnlineSummarizer, instance: Instance) =>
c.add(instance.features, instance.weight)
val combOp = (c1: MultivariateOnlineSummarizer, c2: MultivariateOnlineSummarizer) =>
c1.merge(c2)
val numSlice = math.ceil(instances.first().features.size / 10000).toInt
instances.sliceAggregate(new MultivariateOnlineSummarizer)(seqOp, combOp, numSlice)
}

instances.treeAggregate(
new MultivariateOnlineSummarizer, new MultiClassSummarizer
)(seqOp, combOp, $(aggregationDepth))
val labelSummarizer = {
val seqOp = (summary: MultiClassSummarizer, instance: Instance) =>
summary.add(instance.label, instance.weight)
val combOp = (c1: MultiClassSummarizer, c2: MultiClassSummarizer) => c1.merge(c2)
instances.treeAggregate(new MultiClassSummarizer)(seqOp, combOp)
}

val histogram = labelSummarizer.histogram
Expand Down Expand Up @@ -434,8 +425,7 @@ class LogisticRegression @Since("1.2.0") (

val bcFeaturesStd = instances.context.broadcast(featuresStd)
val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
$(standardization), bcFeaturesStd, regParamL2, multinomial = isMultinomial,
$(aggregationDepth))
$(standardization), bcFeaturesStd, regParamL2, multinomial = isMultinomial)

val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
Expand Down Expand Up @@ -1437,6 +1427,10 @@ private class LogisticAggregator(
s"got type ${bcCoefficients.value.getClass}.)")
}
private lazy val gradientSumArray = new Array[Double](coefficientSize)
def setGradientSumArray(grad: Array[Double]): this.type = {
gradientSumArray = grad.clone()
this
}

if (multinomial && numClasses <= 2) {
logInfo(s"Multinomial logistic regression for binary classification yields separate " +
Expand Down Expand Up @@ -1639,6 +1633,50 @@ private class LogisticAggregator(
scal(1.0 / weightSum, result)
new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray)
}

override def clone(): LogisticAggregator = {
val agg = new LogisticAggregator(bcCoefficients, bcFeaturesStd, numClasses,
fitIntercept, multinomial)
agg.lossSum = this.lossSum
agg.weightSum = this.weightSum
agg.gradientSumArray = this.gradientSumArray.clone()
agg
}
}

private object LogisticAggregator {

implicit object LogisticAggregatorSlicing extends ElementwiseSlicing[LogisticAggregator] {
override def slice(x: LogisticAggregator, numSlices: Int): Iterator[LogisticAggregator] = {
val sliceLength = math.ceil(x.gradientSumArray.length.toDouble / numSlices).toInt

x.gradientSumArray.sliding(sliceLength, sliceLength)
.map { slice =>
val localAggregator = x.clone()
localAggregator.setGradientSumArray(slice)
}
}

override def compose(iter: Iterator[LogisticAggregator]): LogisticAggregator = {
require(iter.hasNext, "compose empty GradientAndLoss!")
val (iter1, iter2) = iter.duplicate
val gradientLength = iter1.map(_.gradientSumArray.length).sum

val comGradient = new Array[Double](gradientLength)

var accumNum = 0
var logisticAggregator: LogisticAggregator = null
iter2.foreach { aggregator =>
if (accumNum == 0) logisticAggregator = aggregator.clone()
val gradSlice = aggregator.gradientSumArray
val sliceLength = gradSlice.length
gradSlice.copyToArray(comGradient, accumNum, sliceLength)
accumNum += sliceLength
}

logisticAggregator.setGradientSumArray(comGradient)
}
}
}

/**
Expand All @@ -1654,8 +1692,7 @@ private class LogisticCostFun(
standardization: Boolean,
bcFeaturesStd: Broadcast[Array[Double]],
regParamL2: Double,
multinomial: Boolean,
aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
multinomial: Boolean) extends DiffFunction[BDV[Double]] {

override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
Expand All @@ -1668,11 +1705,12 @@ private class LogisticCostFun(
val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)
val numSlice = math.ceil((numCoefficientSets * numFeaturesPlusIntercept) / 100000).toInt

instances.treeAggregate(
instances.sliceAggregate(
new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept,
multinomial)
)(seqOp, combOp, aggregationDepth)
)(seqOp, combOp, numSlice)
}

val totalGradientMatrix = logisticAggregator.gradient
Expand Down