Skip to content

Commit

Permalink
[SPARK-35024][ML] Refactor LinearSVC - support virtual centering
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1, remove existing agg, and use a new agg supporting virtual centering
2, add related testsuites

### Why are the changes needed?
centering vectors should accelerate convergence, and generate solution more close to R

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
updated testsuites and added testsuites

Closes #32124 from zhengruifeng/svc_agg_refactor.

Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
  • Loading branch information
zhengruifeng committed Apr 25, 2021
1 parent bcac733 commit 1f150b9
Show file tree
Hide file tree
Showing 12 changed files with 488 additions and 459 deletions.
6 changes: 3 additions & 3 deletions R/pkg/tests/fulltests/test_mllib_classification.R
Expand Up @@ -38,14 +38,14 @@ test_that("spark.svmLinear", {
expect_true(class(summary$coefficients[, 1]) == "numeric")

coefs <- summary$coefficients[, "Estimate"]
expected_coefs <- c(-0.06004978, -0.1563083, -0.460648, 0.2276626, 1.055085)
expected_coefs <- c(-6.8823988, -0.6154984, -1.5135447, 1.9694126, 3.3736856)
expect_true(all(abs(coefs - expected_coefs) < 0.1))

# Test prediction with string label
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
expected <- c("versicolor", "versicolor", "versicolor", "virginica", "virginica",
"virginica", "virginica", "virginica", "virginica", "virginica")
expected <- c("versicolor", "versicolor", "versicolor", "versicolor", "versicolor",
"versicolor", "versicolor", "versicolor", "versicolor", "versicolor")
expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected)

# Test model save and load
Expand Down
Expand Up @@ -222,6 +222,7 @@ class LinearSVC @Since("2.2.0") (
}

val featuresStd = summarizer.std.toArray
val featuresMean = summarizer.mean.toArray
val getFeaturesStd = (j: Int) => featuresStd(j)
val regularization = if ($(regParam) != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
Expand All @@ -239,7 +240,8 @@ class LinearSVC @Since("2.2.0") (
as a result, no scaling is needed.
*/
val (rawCoefficients, objectiveHistory) =
trainImpl(instances, actualBlockSizeInMB, featuresStd, regularization, optimizer)
trainImpl(instances, actualBlockSizeInMB, featuresStd, featuresMean,
regularization, optimizer)

if (rawCoefficients == null) {
val msg = s"${optimizer.getClass.getName} failed."
Expand Down Expand Up @@ -277,16 +279,19 @@ class LinearSVC @Since("2.2.0") (
instances: RDD[Instance],
actualBlockSizeInMB: Double,
featuresStd: Array[Double],
featuresMean: Array[Double],
regularization: Option[L2Regularization],
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
val numFeatures = featuresStd.length
val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures

val bcFeaturesStd = instances.context.broadcast(featuresStd)
val inverseStd = featuresStd.map(std => if (std != 0) 1.0 / std else 0.0)
val scaledMean = Array.tabulate(numFeatures)(i => inverseStd(i) * featuresMean(i))
val bcInverseStd = instances.context.broadcast(inverseStd)
val bcScaledMean = instances.context.broadcast(scaledMean)

val standardized = instances.mapPartitions { iter =>
val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 }
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
val func = StandardScalerModel.getTransformFunc(Array.empty, bcInverseStd.value, false, true)
iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
}

Expand All @@ -295,23 +300,44 @@ class LinearSVC @Since("2.2.0") (
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)")

val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
val getAggregatorFunc = new HingeBlockAggregator(bcInverseStd, bcScaledMean,
$(fitIntercept))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc,
regularization, $(aggregationDepth))

val states = optimizer.iterations(new CachedDiffFunction(costFun),
Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
val initialSolution = Array.ofDim[Double](numFeaturesPlusIntercept)
if ($(fitIntercept)) {
// orginal `initialSolution` is for problem:
// y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept)
// we should adjust it to the initial solution for problem:
// y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept)
// NOTE: this is NOOP before we finally support model initialization
val adapt = BLAS.javaBLAS.ddot(numFeatures, initialSolution, 1, scaledMean, 1)
initialSolution(numFeatures) += adapt
}

val states = optimizer.iterations(new CachedDiffFunction(costFun),
new BDV[Double](initialSolution))
val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
blocks.unpersist()
bcFeaturesStd.destroy()

(if (state != null) state.x.toArray else null, arrayBuilder.result)
bcInverseStd.destroy()
bcScaledMean.destroy()

val solution = if (state == null) null else state.x.toArray
if ($(fitIntercept) && solution != null) {
// the final solution is for problem:
// y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept)
// we should adjust it back for original problem:
// y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept)
val adapt = BLAS.javaBLAS.ddot(numFeatures, solution, 1, scaledMean, 1)
solution(numFeatures) -= adapt
}
(solution, arrayBuilder.result)
}
}

Expand Down
Expand Up @@ -982,14 +982,14 @@ class LogisticRegression @Since("1.2.0") (
val adapt = Array.ofDim[Double](numClasses)
BLAS.javaBLAS.dgemv("N", numClasses, numFeatures, 1.0,
initialSolution, numClasses, scaledMean, 1, 0.0, adapt, 1)
BLAS.getBLAS(numFeatures).daxpy(numClasses, 1.0, adapt, 0, 1,
BLAS.javaBLAS.daxpy(numClasses, 1.0, adapt, 0, 1,
initialSolution, numClasses * numFeatures, 1)
} else {
// orginal `initialCoefWithInterceptArray` is for problem:
// original `initialSolution` is for problem:
// y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept)
// we should adjust it to the initial solution for problem:
// y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept)
val adapt = BLAS.getBLAS(numFeatures).ddot(numFeatures, initialSolution, 1, scaledMean, 1)
val adapt = BLAS.javaBLAS.ddot(numFeatures, initialSolution, 1, scaledMean, 1)
initialSolution(numFeatures) += adapt
}
}
Expand Down Expand Up @@ -1018,14 +1018,14 @@ class LogisticRegression @Since("1.2.0") (
val adapt = Array.ofDim[Double](numClasses)
BLAS.javaBLAS.dgemv("N", numClasses, numFeatures, 1.0,
solution, numClasses, scaledMean, 1, 0.0, adapt, 1)
BLAS.getBLAS(numFeatures).daxpy(numClasses, -1.0, adapt, 0, 1,
BLAS.javaBLAS.daxpy(numClasses, -1.0, adapt, 0, 1,
solution, numClasses * numFeatures, 1)
} else {
// the final solution is for problem:
// y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept)
// we should adjust it back for original problem:
// y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept)
val adapt = BLAS.getBLAS(numFeatures).ddot(numFeatures, solution, 1, scaledMean, 1)
val adapt = BLAS.javaBLAS.ddot(numFeatures, solution, 1, scaledMean, 1)
solution(numFeatures) -= adapt
}
}
Expand Down
Expand Up @@ -72,7 +72,7 @@ private[ml] class BinaryLogisticBlockAggregator(
// deal with non-zero values in prediction.
private val marginOffset = if (fitWithMean) {
coefficientsArray.last -
BLAS.getBLAS(numFeatures).ddot(numFeatures, coefficientsArray, 1, bcScaledMean.value, 1)
BLAS.javaBLAS.ddot(numFeatures, coefficientsArray, 1, bcScaledMean.value, 1)
} else {
Double.NaN
}
Expand Down Expand Up @@ -142,7 +142,7 @@ private[ml] class BinaryLogisticBlockAggregator(
case sm: SparseMatrix if fitIntercept =>
val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures))
BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
gradientSumArray, 1)

case sm: SparseMatrix if !fitIntercept =>
Expand All @@ -156,7 +156,7 @@ private[ml] class BinaryLogisticBlockAggregator(
if (fitWithMean) {
// above update of the linear part of gradientSumArray does NOT take the centering
// into account, here we need to adjust this part.
BLAS.getBLAS(numFeatures).daxpy(numFeatures, -multiplierSum, bcScaledMean.value, 1,
BLAS.javaBLAS.daxpy(numFeatures, -multiplierSum, bcScaledMean.value, 1,
gradientSumArray, 1)
}

Expand Down

This file was deleted.

0 comments on commit 1f150b9

Please sign in to comment.