From 1f150b9392706293946278dd35e8f5a5016ed6df Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Sun, 25 Apr 2021 13:16:46 +0800 Subject: [PATCH] [SPARK-35024][ML] Refactor LinearSVC - support virtual centering ### What changes were proposed in this pull request? 1, remove existing agg, and use a new agg supporting virtual centering 2, add related testsuites ### Why are the changes needed? centering vectors should accelerate convergence, and generate solution more close to R ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? updated testsuites and added testsuites Closes #32124 from zhengruifeng/svc_agg_refactor. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../fulltests/test_mllib_classification.R | 6 +- .../spark/ml/classification/LinearSVC.scala | 46 +++- .../classification/LogisticRegression.scala | 10 +- .../BinaryLogisticBlockAggregator.scala | 6 +- .../ml/optim/aggregator/HingeAggregator.scala | 212 -------------- .../aggregator/HingeBlockAggregator.scala | 162 +++++++++++ .../MultinomialLogisticBlockAggregator.scala | 2 +- .../ml/classification/LinearSVCSuite.scala | 36 +-- .../aggregator/HingeAggregatorSuite.scala | 189 ------------- .../HingeBlockAggregatorSuite.scala | 258 ++++++++++++++++++ python/pyspark/ml/classification.py | 8 +- .../pyspark/ml/tests/test_training_summary.py | 12 +- 12 files changed, 488 insertions(+), 459 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala create mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala delete mode 100644 mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala create mode 100644 mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregatorSuite.scala diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R b/R/pkg/tests/fulltests/test_mllib_classification.R index 5b49a013959df..20339c947d7bf 100644 --- a/R/pkg/tests/fulltests/test_mllib_classification.R +++ b/R/pkg/tests/fulltests/test_mllib_classification.R @@ -38,14 +38,14 @@ test_that("spark.svmLinear", { expect_true(class(summary$coefficients[, 1]) == "numeric") coefs <- summary$coefficients[, "Estimate"] - expected_coefs <- c(-0.06004978, -0.1563083, -0.460648, 0.2276626, 1.055085) + expected_coefs <- c(-6.8823988, -0.6154984, -1.5135447, 1.9694126, 3.3736856) expect_true(all(abs(coefs - expected_coefs) < 0.1)) # Test prediction with string label prediction <- predict(model, training) expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character") - expected <- c("versicolor", "versicolor", "versicolor", "virginica", "virginica", - "virginica", "virginica", "virginica", "virginica", "virginica") + expected <- c("versicolor", "versicolor", "versicolor", "versicolor", "versicolor", + "versicolor", "versicolor", "versicolor", "versicolor", "versicolor") expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected) # Test model save and load diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 9191b3ec4bc2b..9214f55130856 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -222,6 +222,7 @@ class LinearSVC @Since("2.2.0") ( } val featuresStd = summarizer.std.toArray + val featuresMean = summarizer.mean.toArray val getFeaturesStd = (j: Int) => featuresStd(j) val regularization = if ($(regParam) != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures @@ -239,7 +240,8 @@ class LinearSVC @Since("2.2.0") ( as a result, no scaling is needed. */ val (rawCoefficients, objectiveHistory) = - trainImpl(instances, actualBlockSizeInMB, featuresStd, regularization, optimizer) + trainImpl(instances, actualBlockSizeInMB, featuresStd, featuresMean, + regularization, optimizer) if (rawCoefficients == null) { val msg = s"${optimizer.getClass.getName} failed." @@ -277,16 +279,19 @@ class LinearSVC @Since("2.2.0") ( instances: RDD[Instance], actualBlockSizeInMB: Double, featuresStd: Array[Double], + featuresMean: Array[Double], regularization: Option[L2Regularization], optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = { val numFeatures = featuresStd.length val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures - val bcFeaturesStd = instances.context.broadcast(featuresStd) + val inverseStd = featuresStd.map(std => if (std != 0) 1.0 / std else 0.0) + val scaledMean = Array.tabulate(numFeatures)(i => inverseStd(i) * featuresMean(i)) + val bcInverseStd = instances.context.broadcast(inverseStd) + val bcScaledMean = instances.context.broadcast(scaledMean) val standardized = instances.mapPartitions { iter => - val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 } - val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) + val func = StandardScalerModel.getTransformFunc(Array.empty, bcInverseStd.value, false, true) iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } } @@ -295,13 +300,24 @@ class LinearSVC @Since("2.2.0") ( .persist(StorageLevel.MEMORY_AND_DISK) .setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)") - val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_) + val getAggregatorFunc = new HingeBlockAggregator(bcInverseStd, bcScaledMean, + $(fitIntercept))(_) val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) - val states = optimizer.iterations(new CachedDiffFunction(costFun), - Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector) + val initialSolution = Array.ofDim[Double](numFeaturesPlusIntercept) + if ($(fitIntercept)) { + // orginal `initialSolution` is for problem: + // y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept) + // we should adjust it to the initial solution for problem: + // y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept) + // NOTE: this is NOOP before we finally support model initialization + val adapt = BLAS.javaBLAS.ddot(numFeatures, initialSolution, 1, scaledMean, 1) + initialSolution(numFeatures) += adapt + } + val states = optimizer.iterations(new CachedDiffFunction(costFun), + new BDV[Double](initialSolution)) val arrayBuilder = mutable.ArrayBuilder.make[Double] var state: optimizer.State = null while (states.hasNext) { @@ -309,9 +325,19 @@ class LinearSVC @Since("2.2.0") ( arrayBuilder += state.adjustedValue } blocks.unpersist() - bcFeaturesStd.destroy() - - (if (state != null) state.x.toArray else null, arrayBuilder.result) + bcInverseStd.destroy() + bcScaledMean.destroy() + + val solution = if (state == null) null else state.x.toArray + if ($(fitIntercept) && solution != null) { + // the final solution is for problem: + // y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept) + // we should adjust it back for original problem: + // y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept) + val adapt = BLAS.javaBLAS.ddot(numFeatures, solution, 1, scaledMean, 1) + solution(numFeatures) -= adapt + } + (solution, arrayBuilder.result) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 57fb46b451689..c3c54651bad7f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -982,14 +982,14 @@ class LogisticRegression @Since("1.2.0") ( val adapt = Array.ofDim[Double](numClasses) BLAS.javaBLAS.dgemv("N", numClasses, numFeatures, 1.0, initialSolution, numClasses, scaledMean, 1, 0.0, adapt, 1) - BLAS.getBLAS(numFeatures).daxpy(numClasses, 1.0, adapt, 0, 1, + BLAS.javaBLAS.daxpy(numClasses, 1.0, adapt, 0, 1, initialSolution, numClasses * numFeatures, 1) } else { - // orginal `initialCoefWithInterceptArray` is for problem: + // original `initialSolution` is for problem: // y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept) // we should adjust it to the initial solution for problem: // y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept) - val adapt = BLAS.getBLAS(numFeatures).ddot(numFeatures, initialSolution, 1, scaledMean, 1) + val adapt = BLAS.javaBLAS.ddot(numFeatures, initialSolution, 1, scaledMean, 1) initialSolution(numFeatures) += adapt } } @@ -1018,14 +1018,14 @@ class LogisticRegression @Since("1.2.0") ( val adapt = Array.ofDim[Double](numClasses) BLAS.javaBLAS.dgemv("N", numClasses, numFeatures, 1.0, solution, numClasses, scaledMean, 1, 0.0, adapt, 1) - BLAS.getBLAS(numFeatures).daxpy(numClasses, -1.0, adapt, 0, 1, + BLAS.javaBLAS.daxpy(numClasses, -1.0, adapt, 0, 1, solution, numClasses * numFeatures, 1) } else { // the final solution is for problem: // y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept) // we should adjust it back for original problem: // y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept) - val adapt = BLAS.getBLAS(numFeatures).ddot(numFeatures, solution, 1, scaledMean, 1) + val adapt = BLAS.javaBLAS.ddot(numFeatures, solution, 1, scaledMean, 1) solution(numFeatures) -= adapt } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala index 091c885ca01f3..09a4335dad669 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala @@ -72,7 +72,7 @@ private[ml] class BinaryLogisticBlockAggregator( // deal with non-zero values in prediction. private val marginOffset = if (fitWithMean) { coefficientsArray.last - - BLAS.getBLAS(numFeatures).ddot(numFeatures, coefficientsArray, 1, bcScaledMean.value, 1) + BLAS.javaBLAS.ddot(numFeatures, coefficientsArray, 1, bcScaledMean.value, 1) } else { Double.NaN } @@ -142,7 +142,7 @@ private[ml] class BinaryLogisticBlockAggregator( case sm: SparseMatrix if fitIntercept => val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) - BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, + BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, gradientSumArray, 1) case sm: SparseMatrix if !fitIntercept => @@ -156,7 +156,7 @@ private[ml] class BinaryLogisticBlockAggregator( if (fitWithMean) { // above update of the linear part of gradientSumArray does NOT take the centering // into account, here we need to adjust this part. - BLAS.getBLAS(numFeatures).daxpy(numFeatures, -multiplierSum, bcScaledMean.value, 1, + BLAS.javaBLAS.daxpy(numFeatures, -multiplierSum, bcScaledMean.value, 1, gradientSumArray, 1) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala deleted file mode 100644 index 0fe1ed231aa83..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.optim.aggregator - -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.feature.{Instance, InstanceBlock} -import org.apache.spark.ml.linalg._ - -/** - * HingeAggregator computes the gradient and loss for Hinge loss function as used in - * binary classification for instances in sparse or dense vector in an online fashion. - * - * Two HingeAggregators can be merged together to have a summary of loss and gradient of - * the corresponding joint dataset. - * - * This class standardizes feature values during computation using bcFeaturesStd. - * - * @param bcCoefficients The coefficients corresponding to the features. - * @param fitIntercept Whether to fit an intercept term. - * @param bcFeaturesStd The standard deviation values of the features. - */ -private[ml] class HingeAggregator( - bcFeaturesStd: Broadcast[Array[Double]], - fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[Instance, HingeAggregator] { - - private val numFeatures = bcFeaturesStd.value.length - private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures - @transient private lazy val coefficientsArray = bcCoefficients.value match { - case DenseVector(values) => values - case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + - s" but got type ${bcCoefficients.value.getClass}.") - } - protected override val dim: Int = numFeaturesPlusIntercept - - /** - * Add a new training instance to this HingeAggregator, and update the loss and gradient - * of the objective function. - * - * @param instance The instance of data point to be added. - * @return This HingeAggregator object. - */ - def add(instance: Instance): this.type = { - instance match { case Instance(label, weight, features) => - require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + - s" Expecting $numFeatures but got ${features.size}.") - require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") - - if (weight == 0.0) return this - val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = coefficientsArray - val localGradientSumArray = gradientSumArray - - val dotProduct = { - var sum = 0.0 - features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - sum += localCoefficients(index) * value / localFeaturesStd(index) - } - } - if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) - sum - } - // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) - // Therefore the gradient is -(2y - 1)*x - val labelScaled = 2 * label - 1.0 - val loss = if (1.0 > labelScaled * dotProduct) { - (1.0 - labelScaled * dotProduct) * weight - } else { - 0.0 - } - - if (1.0 > labelScaled * dotProduct) { - val gradientScale = -labelScaled * weight - features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) - } - } - if (fitIntercept) { - localGradientSumArray(localGradientSumArray.length - 1) += gradientScale - } - } - - lossSum += loss - weightSum += weight - this - } - } -} - - -/** - * BlockHingeAggregator computes the gradient and loss for Hinge loss function as used in - * binary classification for blocks in sparse or dense matrix in an online fashion. - * - * Two BlockHingeAggregators can be merged together to have a summary of loss and gradient of - * the corresponding joint dataset. - * - * NOTE: The feature values are expected to be standardized before computation. - * - * @param bcCoefficients The coefficients corresponding to the features. - * @param fitIntercept Whether to fit an intercept term. - */ -private[ml] class BlockHingeAggregator( - fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[InstanceBlock, BlockHingeAggregator] { - - protected override val dim: Int = bcCoefficients.value.size - private val numFeatures = if (fitIntercept) dim - 1 else dim - - @transient private lazy val coefficientsArray = bcCoefficients.value match { - case DenseVector(values) => values - case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + - s" but got type ${bcCoefficients.value.getClass}.") - } - - @transient private lazy val linear = { - val linear = if (fitIntercept) coefficientsArray.take(numFeatures) else coefficientsArray - Vectors.dense(linear) - } - - /** - * Add a new training instance block to this BlockHingeAggregator, and update the loss and - * gradient of the objective function. - * - * @param block The InstanceBlock to be added. - * @return This BlockHingeAggregator object. - */ - def add(block: InstanceBlock): this.type = { - require(block.matrix.isTransposed) - require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + - s"instance. Expecting $numFeatures but got ${block.numFeatures}.") - require(block.weightIter.forall(_ >= 0), - s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") - - if (block.weightIter.forall(_ == 0)) return this - val size = block.size - - // vec here represents dotProducts - val vec = if (fitIntercept) { - Vectors.dense(Array.fill(size)(coefficientsArray.last)).toDense - } else { - Vectors.zeros(size).toDense - } - BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) - - // in-place convert dotProducts to gradient scales - // then, vec represents gradient scales - var localLossSum = 0.0 - var i = 0 - while (i < size) { - val weight = block.getWeight(i) - if (weight > 0) { - // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) - // Therefore the gradient is -(2y - 1)*x - val label = block.getLabel(i) - val labelScaled = label + label - 1.0 - val loss = (1.0 - labelScaled * vec(i)) * weight - if (loss > 0) { - localLossSum += loss - val gradScale = -labelScaled * weight - vec.values(i) = gradScale - } else { vec.values(i) = 0.0 } - } else { vec.values(i) = 0.0 } - i += 1 - } - lossSum += localLossSum - weightSum += block.weightIter.sum - - // predictions are all correct, no gradient signal - if (vec.values.forall(_ == 0)) return this - - block.matrix match { - case dm: DenseMatrix => - BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, - vec.values, 1, 1.0, gradientSumArray, 1) - - case sm: SparseMatrix if fitIntercept => - val linearGradSumVec = Vectors.zeros(numFeatures).toDense - BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) - BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, - gradientSumArray, 1) - - case sm: SparseMatrix if !fitIntercept => - val gradSumVec = new DenseVector(gradientSumArray) - BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) - - case m => - throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.") - } - - if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum - - this - } -} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala new file mode 100644 index 0000000000000..f99c531c96b9d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging +import org.apache.spark.ml.feature.InstanceBlock +import org.apache.spark.ml.linalg._ + + +/** + * HingeBlockAggregator computes the gradient and loss for Huber loss function + * as used in linear regression for blocks in sparse or dense matrix in an online fashion. + * + * Two BlockHuberAggregators can be merged together to have a summary of loss and gradient + * of the corresponding joint dataset. + * + * NOTE: The feature values are expected to already have be scaled (multiplied by bcInverseStd, + * but NOT centered) before computation. + * + * @param bcCoefficients The coefficients corresponding to the features. + * @param fitIntercept Whether to fit an intercept term. When true, will perform data centering + * in a virtual way. Then we MUST adjust the intercept of both initial + * coefficients and final solution in the caller. + */ +private[ml] class HingeBlockAggregator( + bcInverseStd: Broadcast[Array[Double]], + bcScaledMean: Broadcast[Array[Double]], + fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[InstanceBlock, HingeBlockAggregator] + with Logging { + + if (fitIntercept) { + require(bcScaledMean != null && bcScaledMean.value.length == bcInverseStd.value.length, + "scaled means is required when center the vectors") + } + + private val numFeatures = bcInverseStd.value.length + protected override val dim: Int = bcCoefficients.value.size + + @transient private lazy val coefficientsArray = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + + s"got type ${bcCoefficients.value.getClass}.)") + } + + @transient private lazy val linear = if (fitIntercept) { + new DenseVector(coefficientsArray.take(numFeatures)) + } else { + new DenseVector(coefficientsArray) + } + + // pre-computed margin of an empty vector. + // with this variable as an offset, for a sparse vector, we only need to + // deal with non-zero values in prediction. + private val marginOffset = if (fitIntercept) { + coefficientsArray.last - + BLAS.javaBLAS.ddot(numFeatures, coefficientsArray, 1, bcScaledMean.value, 1) + } else { + Double.NaN + } + + /** + * Add a new training instance block to this HingeBlockAggregator, and update the loss + * and gradient of the objective function. + * + * @param block The instance block of data point to be added. + * @return This HingeBlockAggregator object. + */ + def add(block: InstanceBlock): this.type = { + require(block.matrix.isTransposed) + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.weightIter.forall(_ >= 0), + s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (block.weightIter.forall(_ == 0)) return this + val size = block.size + + // vec/arr here represents margins + val vec = new DenseVector(Array.ofDim[Double](size)) + val arr = vec.values + if (fitIntercept) java.util.Arrays.fill(arr, marginOffset) + BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) + + // in-place convert margins to multiplier + // then, vec/arr represents multiplier + var localLossSum = 0.0 + var localWeightSum = 0.0 + var multiplierSum = 0.0 + var i = 0 + while (i < size) { + val weight = block.getWeight(i) + localWeightSum += weight + if (weight > 0) { + // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val label = block.getLabel(i) + val labelScaled = label + label - 1.0 + val loss = (1.0 - labelScaled * arr(i)) * weight + if (loss > 0) { + localLossSum += loss + val multiplier = -labelScaled * weight + arr(i) = multiplier + multiplierSum += multiplier + } else { arr(i) = 0.0 } + } else { arr(i) = 0.0 } + i += 1 + } + lossSum += localLossSum + weightSum += localWeightSum + + // predictions are all correct, no gradient signal + if (arr.forall(_ == 0)) return this + + // update the linear part of gradientSumArray + block.matrix match { + case dm: DenseMatrix => + BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, + vec.values, 1, 1.0, gradientSumArray, 1) + + case sm: SparseMatrix if fitIntercept => + val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) + BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) + BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, + gradientSumArray, 1) + + case sm: SparseMatrix if !fitIntercept => + val gradSumVec = new DenseVector(gradientSumArray) + BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) + + case m => + throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.") + } + + if (fitIntercept) { + // above update of the linear part of gradientSumArray does NOT take the centering + // into account, here we need to adjust this part. + BLAS.javaBLAS.daxpy(numFeatures, -multiplierSum, bcScaledMean.value, 1, + gradientSumArray, 1) + + // update the intercept part of gradientSumArray + gradientSumArray(numFeatures) += multiplierSum + } + + this + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/MultinomialLogisticBlockAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/MultinomialLogisticBlockAggregator.scala index de6444084379e..0683cec628849 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/MultinomialLogisticBlockAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/MultinomialLogisticBlockAggregator.scala @@ -203,7 +203,7 @@ private[ml] class MultinomialLogisticBlockAggregator( } if (fitIntercept) { - BLAS.getBLAS(numClasses).daxpy(numClasses, 1.0, multiplierSum, 0, 1, + BLAS.javaBLAS.daxpy(numClasses, 1.0, multiplierSum, 0, 1, gradientSumArray, numClasses * numFeatures, 1) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index d8b9c6a606ec2..d18a950a01ab4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -23,9 +23,8 @@ import breeze.linalg.{DenseVector => BDV} import org.scalatest.Assertions._ import org.apache.spark.ml.classification.LinearSVCSuite._ -import org.apache.spark.ml.feature.{Instance, LabeledPoint} +import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.ml.optim.aggregator.HingeAggregator import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ @@ -176,28 +175,13 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { assert(model2.intercept !== 0.0) } - test("sparse coefficients in HingeAggregator") { - val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) - val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients) - val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { - intercept[IllegalArgumentException] { - agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) - } - } - assert(thrown.getMessage.contains("coefficients only supports dense")) - - bcCoefficients.destroy() - bcFeaturesStd.destroy() - } - test("linearSVC with sample weights") { def modelEquals(m1: LinearSVCModel, m2: LinearSVCModel): Unit = { - assert(m1.coefficients ~== m2.coefficients absTol 0.05) + assert(m1.coefficients ~== m2.coefficients relTol 0.05) assert(m1.intercept ~== m2.intercept absTol 0.05) } - val estimator = new LinearSVC().setRegParam(0.01).setTol(0.01) + val estimator = new LinearSVC().setRegParam(0.01).setTol(0.001) val dataset = smallBinaryDataset MLTestingUtils.testArbitrarilyScaledWeights[LinearSVCModel, LinearSVC]( dataset.as[LabeledPoint], estimator, modelEquals) @@ -237,7 +221,7 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { val model1 = trainer1.fit(binaryDataset) /* - Use the following R code to load the data and train the model using glmnet package. + Use the following R code to load the data and train the model using e1071 package. library(e1071) data <- read.csv("path/target/tmp/LinearSVC/binaryDataset/part-00000", header=FALSE) @@ -257,8 +241,8 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { */ val coefficientsR = Vectors.dense(7.310338, 14.89741, 22.21005, 29.83508) val interceptR = 7.440177 - assert(model1.intercept ~== interceptR relTol 1E-2) - assert(model1.coefficients ~== coefficientsR relTol 1E-2) + assert(model1.intercept ~== interceptR relTol 1E-3) + assert(model1.coefficients ~== coefficientsR relTol 5E-3) /* Use the following python code to load the data and train the model using scikit-learn package. @@ -280,8 +264,8 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { val coefficientsSK = Vectors.dense(7.24690165, 14.77029087, 21.99924004, 29.5575729) val interceptSK = 7.36947518 - assert(model1.intercept ~== interceptSK relTol 1E-3) - assert(model1.coefficients ~== coefficientsSK relTol 4E-3) + assert(model1.intercept ~== interceptSK relTol 1E-2) + assert(model1.coefficients ~== coefficientsSK relTol 1E-2) } test("summary and training summary") { @@ -379,8 +363,8 @@ object LinearSVCSuite { } def checkModels(model1: LinearSVCModel, model2: LinearSVCModel): Unit = { - assert(model1.intercept == model2.intercept) - assert(model1.coefficients.equals(model2.coefficients)) + assert(model1.intercept ~== model2.intercept relTol 1e-9) + assert(model1.coefficients ~== model2.coefficients relTol 1e-9) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala deleted file mode 100644 index 425a5eb26ab67..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.ml.optim.aggregator - -import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.{Instance, InstanceBlock} -import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} -import org.apache.spark.ml.stat.Summarizer -import org.apache.spark.ml.util.TestingUtils._ -import org.apache.spark.mllib.util.MLlibTestSparkContext - -class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { - - @transient var instances: Array[Instance] = _ - @transient var instancesConstantFeature: Array[Instance] = _ - @transient var instancesConstantFeatureFiltered: Array[Instance] = _ - @transient var standardizedInstances: Array[Instance] = _ - - override def beforeAll(): Unit = { - super.beforeAll() - instances = Array( - Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), - Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), - Instance(0.0, 0.3, Vectors.dense(4.0, 0.5)) - ) - instancesConstantFeature = Array( - Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), - Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), - Instance(1.0, 0.3, Vectors.dense(1.0, 0.5)) - ) - instancesConstantFeatureFiltered = Array( - Instance(0.0, 0.1, Vectors.dense(2.0)), - Instance(1.0, 0.5, Vectors.dense(1.0)), - Instance(2.0, 0.3, Vectors.dense(0.5)) - ) - standardizedInstances = standardize(instances) - } - - /** Get summary statistics for some data and create a new HingeAggregator. */ - private def getNewAggregator( - instances: Array[Instance], - coefficients: Vector, - fitIntercept: Boolean): HingeAggregator = { - val (featuresSummarizer, ySummarizer) = - Summarizer.getClassificationSummarizers(sc.parallelize(instances)) - val featuresStd = featuresSummarizer.std.toArray - val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) - val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients) - } - - /** Get summary statistics for some data and create a new BlockHingeAggregator. */ - private def getNewBlockAggregator( - coefficients: Vector, - fitIntercept: Boolean): BlockHingeAggregator = { - val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new BlockHingeAggregator(fitIntercept)(bcCoefficients) - } - - test("aggregator add method input size") { - val coefArray = Array(1.0, 2.0) - val interceptArray = Array(2.0) - val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), - fitIntercept = true) - withClue("HingeAggregator features dimension must match coefficients dimension") { - intercept[IllegalArgumentException] { - agg.add(Instance(1.0, 1.0, Vectors.dense(2.0))) - } - } - } - - test("negative weight") { - val coefArray = Array(1.0, 2.0) - val interceptArray = Array(2.0) - val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), - fitIntercept = true) - withClue("HingeAggregator does not support negative instance weights") { - intercept[IllegalArgumentException] { - agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))) - } - } - } - - test("check sizes") { - val rng = new scala.util.Random - val numFeatures = instances.head.features.size - val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble)) - val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble)) - val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true) - val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept, - fitIntercept = false) - instances.foreach(aggIntercept.add) - instances.foreach(aggNoIntercept.add) - - assert(aggIntercept.gradient.size === numFeatures + 1) - assert(aggNoIntercept.gradient.size === numFeatures) - } - - test("check correctness") { - val coefArray = Array(1.0, 2.0) - val intercept = 1.0 - val numFeatures = instances.head.features.size - val (featuresSummarizer, _) = Summarizer.getClassificationSummarizers(sc.parallelize(instances)) - val featuresStd = featuresSummarizer.std.toArray - val weightSum = instances.map(_.weight).sum - - val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true) - instances.foreach(agg.add) - - // compute the loss - val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i)).toArray - val lossSum = instances.map { case Instance(l, w, f) => - val margin = BLAS.dot(Vectors.dense(stdCoef), f) + intercept - val labelScaled = 2 * l - 1.0 - if (1.0 > labelScaled * margin) { - (1.0 - labelScaled * margin) * w - } else { - 0.0 - } - }.sum - val loss = lossSum / weightSum - - // compute the gradients - val gradientCoef = new Array[Double](numFeatures) - var gradientIntercept = 0.0 - instances.foreach { case Instance(l, w, f) => - val margin = BLAS.dot(f, Vectors.dense(coefArray)) + intercept - if (1.0 > (2 * l - 1.0) * margin) { - gradientCoef.indices.foreach { i => - gradientCoef(i) += f(i) * -(2 * l - 1.0) * w / featuresStd(i) - } - gradientIntercept += -(2 * l - 1.0) * w - } - } - val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum)) - - assert(loss ~== agg.loss relTol 1e-9) - assert(gradient ~== agg.gradient relTol 1e-9) - - Seq(1, 2, 4).foreach { blockSize => - val blocks1 = standardizedInstances - .grouped(blockSize) - .map(seq => InstanceBlock.fromInstances(seq)) - .toArray - val blocks2 = blocks1.map { block => - new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) - } - - Seq(blocks1, blocks2).foreach { blocks => - val blockAgg = getNewBlockAggregator(Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true) - blocks.foreach(blockAgg.add) - assert(agg.loss ~== blockAgg.loss relTol 1e-9) - assert(agg.gradient ~== blockAgg.gradient relTol 1e-9) - } - } - } - - test("check with zero standard deviation") { - val binaryCoefArray = Array(1.0, 2.0) - val intercept = 1.0 - val aggConstantFeatureBinary = getNewAggregator(instancesConstantFeature, - Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true) - instancesConstantFeature.foreach(aggConstantFeatureBinary.add) - - val aggConstantFeatureBinaryFiltered = getNewAggregator(instancesConstantFeatureFiltered, - Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true) - instancesConstantFeatureFiltered.foreach(aggConstantFeatureBinaryFiltered.add) - - // constant features should not affect gradient - assert(aggConstantFeatureBinary.gradient(0) === 0.0) - assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) - } -} diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregatorSuite.scala new file mode 100644 index 0000000000000..029911adb46aa --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregatorSuite.scala @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.stat.Summarizer +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class HingeBlockAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { + + @transient var instances: Array[Instance] = _ + @transient var instancesConstantFeature: Array[Instance] = _ + @transient var instancesConstantFeatureFiltered: Array[Instance] = _ + @transient var scaledInstances: Array[Instance] = _ + + override def beforeAll(): Unit = { + super.beforeAll() + instances = Array( + Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), + Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), + Instance(0.0, 0.3, Vectors.dense(4.0, 0.5)) + ) + instancesConstantFeature = Array( + Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), + Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), + Instance(1.0, 0.3, Vectors.dense(1.0, 0.5)) + ) + instancesConstantFeatureFiltered = Array( + Instance(0.0, 0.1, Vectors.dense(2.0)), + Instance(1.0, 0.5, Vectors.dense(1.0)), + Instance(1.0, 0.3, Vectors.dense(0.5)) + ) + scaledInstances = standardize(instances) + } + + + /** Get summary statistics for some data and create a new HingeBlockAggregator. */ + private def getNewAggregator( + instances: Array[Instance], + coefficients: Vector, + fitIntercept: Boolean): HingeBlockAggregator = { + val (featuresSummarizer, _) = + Summarizer.getClassificationSummarizers(sc.parallelize(instances)) + val featuresStd = featuresSummarizer.std.toArray + val featuresMean = featuresSummarizer.mean.toArray + val inverseStd = featuresStd.map(std => if (std != 0) 1.0 / std else 0.0) + val scaledMean = inverseStd.zip(featuresMean).map(t => t._1 * t._2) + val bcInverseStd = sc.broadcast(inverseStd) + val bcScaledMean = sc.broadcast(scaledMean) + val bcCoefficients = sc.broadcast(coefficients) + new HingeBlockAggregator(bcInverseStd, bcScaledMean, fitIntercept)(bcCoefficients) + } + + test("sparse coefficients") { + val bcInverseStd = sc.broadcast(Array(1.0)) + val bcScaledMean = sc.broadcast(Array(2.0)) + val bcCoefficients = sc.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) + val binaryAgg = new HingeBlockAggregator(bcInverseStd, bcScaledMean, + fitIntercept = false)(bcCoefficients) + val block = InstanceBlock.fromInstances(Seq(Instance(1.0, 1.0, Vectors.dense(1.0)))) + val thrownBinary = withClue("aggregator cannot handle sparse coefficients") { + intercept[IllegalArgumentException] { + binaryAgg.add(block) + } + } + assert(thrownBinary.getMessage.contains("coefficients only supports dense")) + } + + test("aggregator add method input size") { + val coefArray = Array(1.0, 2.0) + val interceptValue = 4.0 + val agg = getNewAggregator(instances, Vectors.dense(coefArray :+ interceptValue), + fitIntercept = true) + val block = InstanceBlock.fromInstances(Seq(Instance(1.0, 1.0, Vectors.dense(2.0)))) + withClue("BinaryLogisticBlockAggregator features dimension must match coefficients dimension") { + intercept[IllegalArgumentException] { + agg.add(block) + } + } + } + + test("negative weight") { + val coefArray = Array(1.0, 2.0) + val interceptValue = 4.0 + val agg = getNewAggregator(instances, Vectors.dense(coefArray :+ interceptValue), + fitIntercept = true) + val block = InstanceBlock.fromInstances(Seq(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0)))) + withClue("BinaryLogisticBlockAggregator does not support negative instance weights") { + intercept[IllegalArgumentException] { + agg.add(block) + } + } + } + + test("check sizes") { + val rng = new scala.util.Random + val numFeatures = instances.head.features.size + val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble)) + val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble)) + val block = InstanceBlock.fromInstances(instances) + + val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true) + aggIntercept.add(block) + assert(aggIntercept.gradient.size === numFeatures + 1) + + val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept, fitIntercept = false) + aggNoIntercept.add(block) + assert(aggNoIntercept.gradient.size === numFeatures) + } + + test("check correctness: fitIntercept = false") { + val coefVec = Vectors.dense(1.0, 2.0) + val numFeatures = instances.head.features.size + val (featuresSummarizer, _) = + Summarizer.getClassificationSummarizers(sc.parallelize(instances)) + val featuresStd = featuresSummarizer.std + val stdCoefVec = Vectors.dense(Array.tabulate(coefVec.size)(i => coefVec(i) / featuresStd(i))) + val weightSum = instances.map(_.weight).sum + + // compute the loss and the gradients + var lossSum = 0.0 + val gradientCoef = Array.ofDim[Double](numFeatures) + instances.foreach { case Instance(l, w, f) => + val margin = BLAS.dot(stdCoefVec, f) + val labelScaled = 2 * l - 1.0 + if (1.0 > labelScaled * margin) { + lossSum += (1.0 - labelScaled * margin) * w + gradientCoef.indices.foreach { i => + gradientCoef(i) += f(i) * -(2 * l - 1.0) * w / featuresStd(i) + } + } + } + val loss = lossSum / weightSum + val gradient = Vectors.dense(gradientCoef.map(_ / weightSum)) + + Seq(1, 2, 4).foreach { blockSize => + val blocks1 = scaledInstances + .grouped(blockSize) + .map(seq => InstanceBlock.fromInstances(seq)) + .toArray + val blocks2 = blocks1.map { block => + new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) + } + + Seq(blocks1, blocks2).foreach { blocks => + val agg = getNewAggregator(instances, coefVec, fitIntercept = false) + blocks.foreach(agg.add) + assert(agg.loss ~== loss relTol 1e-9) + assert(agg.gradient ~== gradient relTol 1e-9) + } + } + } + + test("check correctness: fitIntercept = true") { + val coefVec = Vectors.dense(1.0, 2.0) + val interceptValue = 1.0 + val numFeatures = instances.head.features.size + val (featuresSummarizer, _) = + Summarizer.getClassificationSummarizers(sc.parallelize(instances)) + val featuresStd = featuresSummarizer.std + val featuresMean = featuresSummarizer.mean + val stdCoefVec = Vectors.dense(Array.tabulate(coefVec.size)(i => coefVec(i) / featuresStd(i))) + val weightSum = instances.map(_.weight).sum + + // compute the loss and the gradients + var lossSum = 0.0 + val gradientCoef = Array.ofDim[Double](numFeatures) + var gradientIntercept = 0.0 + instances.foreach { case Instance(l, w, f) => + val centered = f.toDense.copy + BLAS.axpy(-1.0, featuresMean, centered) + val margin = BLAS.dot(stdCoefVec, centered) + interceptValue + val labelScaled = 2 * l - 1.0 + if (1.0 > labelScaled * margin) { + lossSum += (1.0 - labelScaled * margin) * w + gradientCoef.indices.foreach { i => + gradientCoef(i) += (f(i) - featuresMean(i)) * -(2 * l - 1.0) * w / featuresStd(i) + } + gradientIntercept += -(2 * l - 1.0) * w + } + } + val loss = lossSum / weightSum + val gradient = Vectors.dense((gradientCoef :+ gradientIntercept).map(_ / weightSum)) + + Seq(1, 2, 4).foreach { blockSize => + val blocks1 = scaledInstances + .grouped(blockSize) + .map(seq => InstanceBlock.fromInstances(seq)) + .toArray + val blocks2 = blocks1.map { block => + new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) + } + + Seq(blocks1, blocks2).foreach { blocks => + val agg = getNewAggregator(instances, Vectors.dense(coefVec.toArray :+ interceptValue), + fitIntercept = true) + blocks.foreach(agg.add) + assert(agg.loss ~== loss relTol 1e-9) + assert(agg.gradient ~== gradient relTol 1e-9) + } + } + } + + test("check with zero standard deviation") { + val coefArray = Array(1.0, 2.0) + val coefArrayFiltered = Array(2.0) + val interceptValue = 1.0 + + Seq(false, true).foreach { fitIntercept => + val coefVec = if (fitIntercept) { + Vectors.dense(coefArray :+ interceptValue) + } else { + Vectors.dense(coefArray) + } + val aggConstantFeature = getNewAggregator(instancesConstantFeature, + coefVec, fitIntercept = fitIntercept) + aggConstantFeature + .add(InstanceBlock.fromInstances(standardize(instancesConstantFeature))) + val grad = aggConstantFeature.gradient + + val coefVecFiltered = if (fitIntercept) { + Vectors.dense(coefArrayFiltered :+ interceptValue) + } else { + Vectors.dense(coefArrayFiltered) + } + val aggConstantFeatureFiltered = getNewAggregator(instancesConstantFeatureFiltered, + coefVecFiltered, fitIntercept = fitIntercept) + aggConstantFeatureFiltered + .add(InstanceBlock.fromInstances(standardize(instancesConstantFeatureFiltered))) + val gradFiltered = aggConstantFeatureFiltered.gradient + + // constant features should not affect gradient + assert(aggConstantFeature.loss ~== aggConstantFeatureFiltered.loss relTol 1e-9) + assert(grad(0) === 0) + assert(grad(1) ~== gradFiltered(0) relTol 1e-9) + if (fitIntercept) { + assert(grad.toArray.last ~== gradFiltered.toArray.last relTol 1e-9) + } + } + } +} diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 17994ed5e3d28..620760905a451 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -571,9 +571,9 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl >>> model.getMaxBlockSizeInMB() 0.0 >>> model.coefficients - DenseVector([0.0, -0.2792, -0.1833]) + DenseVector([0.0, -1.0319, -0.5159]) >>> model.intercept - 1.0206118982229047 + 2.579645978780695 >>> model.numClasses 2 >>> model.numFeatures @@ -582,12 +582,12 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl >>> model.predict(test0.head().features) 1.0 >>> model.predictRaw(test0.head().features) - DenseVector([-1.4831, 1.4831]) + DenseVector([-4.1274, 4.1274]) >>> result = model.transform(test0).head() >>> result.newPrediction 1.0 >>> result.rawPrediction - DenseVector([-1.4831, 1.4831]) + DenseVector([-4.1274, 4.1274]) >>> svm_path = temp_path + "/svm" >>> svm.save(svm_path) >>> svm2 = LinearSVC.load(svm_path) diff --git a/python/pyspark/ml/tests/test_training_summary.py b/python/pyspark/ml/tests/test_training_summary.py index 7dafdcb3d683b..5b31c871fb271 100644 --- a/python/pyspark/ml/tests/test_training_summary.py +++ b/python/pyspark/ml/tests/test_training_summary.py @@ -223,12 +223,12 @@ def test_linear_svc_summary(self): self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) print(s.weightedTruePositiveRate) - self.assertAlmostEqual(s.weightedTruePositiveRate, 0.5, 2) - self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.5, 2) - self.assertAlmostEqual(s.weightedRecall, 0.5, 2) - self.assertAlmostEqual(s.weightedPrecision, 0.25, 2) - self.assertAlmostEqual(s.weightedFMeasure(), 0.3333333333333333, 2) - self.assertAlmostEqual(s.weightedFMeasure(1.0), 0.3333333333333333, 2) + self.assertAlmostEqual(s.weightedTruePositiveRate, 1.0, 2) + self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.0, 2) + self.assertAlmostEqual(s.weightedRecall, 1.0, 2) + self.assertAlmostEqual(s.weightedPrecision, 1.0, 2) + self.assertAlmostEqual(s.weightedFMeasure(), 1.0, 2) + self.assertAlmostEqual(s.weightedFMeasure(1.0), 1.0, 2) # test evaluation (with training dataset) produces a summary with same values # one check is enough to verify a summary is returned, Scala version runs full test sameSummary = model.evaluate(df)