From d46e5edfdf5d052c59677f58767bdbe0803dc368 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 4 May 2017 12:13:41 -0700 Subject: [PATCH 01/14] add lbfgs as default optimizer of LinearSVC --- .../spark/ml/classification/LinearSVC.scala | 55 ++++++++++++-- .../ml/classification/LinearSVCSuite.scala | 73 +++++++++++++------ 2 files changed, 98 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 7507c7539d4ef..863a2c6e27c35 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -17,10 +17,13 @@ package org.apache.spark.ml.classification +import java.util.Locale + import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} -import breeze.optimize.{CachedDiffFunction, DiffFunction, OWLQN => BreezeOWLQN} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, + OWLQN => BreezeOWLQN} import org.apache.hadoop.fs.Path import org.apache.spark.SparkException @@ -42,7 +45,21 @@ import org.apache.spark.sql.functions.{col, lit} /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasThreshold with HasAggregationDepth + with HasThreshold with HasAggregationDepth { + + /** + * The optimization algorithm for LinearSVC. + * Supported options: "lbfgs" and "owlqn". + * (default: "lbfgs") + * @group param + */ + final val optimizer: Param[String] = new Param[String](this, "optimizer", "The optimization" + + " algorithm to be used", ParamValidators.inArray[String](LinearSVC.supportedOptimizers)) + + /** @group getParam */ + final def getOptimizer: String = $(optimizer) + +} /** * :: Experimental :: @@ -60,6 +77,8 @@ class LinearSVC @Since("2.2.0") ( extends Classifier[Vector, LinearSVC, LinearSVCModel] with LinearSVCParams with DefaultParamsWritable { + import LinearSVC._ + @Since("2.2.0") def this() = this(Identifiable.randomUID("linearsvc")) @@ -145,6 +164,15 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) + /** + * Set optimizer for LinearSVC. Supported options: "lbfgs" and "owlqn". + * + * @group setParam + */ + @Since("2.2.0") + def setOptimizer(value: String): this.type = set(optimizer, value.toLowerCase(Locale.ROOT)) + setDefault(optimizer -> "lbfgs") + @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) @@ -205,15 +233,21 @@ class LinearSVC @Since("2.2.0") ( val costFun = new LinearSVCCostFun(instances, $(fitIntercept), $(standardization), bcFeaturesStd, regParamL2, $(aggregationDepth)) - def regParamL1Fun = (index: Int) => 0D - val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) + val optimizerAlgo = $(optimizer) match { + case LBFGS => new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) + case OWLQN => + def regParamL1Fun = (index: Int) => 0D + new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) + case _ => throw new SparkException ("unexpected optimizer: " + $(optimizer)) + } + val initialCoefWithIntercept = Vectors.zeros(numFeaturesPlusIntercept) - val states = optimizer.iterations(new CachedDiffFunction(costFun), + val states = optimizerAlgo.iterations(new CachedDiffFunction(costFun), initialCoefWithIntercept.asBreeze.toDenseVector) val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double] - var state: optimizer.State = null + var state: optimizerAlgo.State = null while (states.hasNext) { state = states.next() scaledObjectiveHistory += state.adjustedValue @@ -258,6 +292,15 @@ class LinearSVC @Since("2.2.0") ( @Since("2.2.0") object LinearSVC extends DefaultParamsReadable[LinearSVC] { + /** String name for Limited-memory BFGS. */ + private[classification] val LBFGS: String = "lbfgs".toLowerCase(Locale.ROOT) + + /** String name for Orthant-Wise Limited-memory Quasi-Newton. */ + private[classification] val OWLQN: String = "owlqn".toLowerCase(Locale.ROOT) + + /* Set of optimizers that LinearSVC supports */ + private[classification] val supportedOptimizers = Array(LBFGS, OWLQN) + @Since("2.2.0") override def load(path: String): LinearSVC = super.load(path) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index 2f87afc23fe7e..e7d623f789254 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -75,21 +75,25 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau } test("Linear SVC binary classification") { - val svm = new LinearSVC() - val model = svm.fit(smallBinaryDataset) - assert(model.transform(smallValidationDataset) - .where("prediction=label").count() > nPoints * 0.8) - val sparseModel = svm.fit(smallSparseBinaryDataset) - checkModels(model, sparseModel) + LinearSVC.supportedOptimizers.foreach { opt => + val svm = new LinearSVC().setOptimizer(opt) + val model = svm.fit(smallBinaryDataset) + assert(model.transform(smallValidationDataset) + .where("prediction=label").count() > nPoints * 0.8) + val sparseModel = svm.fit(smallSparseBinaryDataset) + checkModels(model, sparseModel) + } } test("Linear SVC binary classification with regularization") { - val svm = new LinearSVC() - val model = svm.setRegParam(0.1).fit(smallBinaryDataset) - assert(model.transform(smallValidationDataset) - .where("prediction=label").count() > nPoints * 0.8) - val sparseModel = svm.fit(smallSparseBinaryDataset) - checkModels(model, sparseModel) + LinearSVC.supportedOptimizers.foreach { opt => + val svm = new LinearSVC().setOptimizer(opt).setMaxIter(10) + val model = svm.setRegParam(0.1).fit(smallBinaryDataset) + assert(model.transform(smallValidationDataset) + .where("prediction=label").count() > nPoints * 0.8) + val sparseModel = svm.fit(smallSparseBinaryDataset) + checkModels(model, sparseModel) + } } test("params") { @@ -112,6 +116,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(lsvc.getFeaturesCol === "features") assert(lsvc.getPredictionCol === "prediction") assert(lsvc.getRawPredictionCol === "rawPrediction") + assert(lsvc.getOptimizer === "lbfgs") val model = lsvc.setMaxIter(5).fit(smallBinaryDataset) model.transform(smallBinaryDataset) .select("label", "prediction", "rawPrediction") @@ -154,22 +159,23 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau test("linearSVC with sample weights") { def modelEquals(m1: LinearSVCModel, m2: LinearSVCModel): Unit = { - assert(m1.coefficients ~== m2.coefficients absTol 0.05) + assert(m1.coefficients ~== m2.coefficients absTol 0.07) assert(m1.intercept ~== m2.intercept absTol 0.05) } - - val estimator = new LinearSVC().setRegParam(0.01).setTol(0.01) - val dataset = smallBinaryDataset - MLTestingUtils.testArbitrarilyScaledWeights[LinearSVCModel, LinearSVC]( - dataset.as[LabeledPoint], estimator, modelEquals) - MLTestingUtils.testOutliersWithSmallWeights[LinearSVCModel, LinearSVC]( - dataset.as[LabeledPoint], estimator, 2, modelEquals, outlierRatio = 3) - MLTestingUtils.testOversamplingVsWeighting[LinearSVCModel, LinearSVC]( - dataset.as[LabeledPoint], estimator, modelEquals, 42L) + LinearSVC.supportedOptimizers.foreach { opt => + val estimator = new LinearSVC().setRegParam(0.02).setTol(0.01).setOptimizer(opt) + val dataset = smallBinaryDataset + MLTestingUtils.testArbitrarilyScaledWeights[LinearSVCModel, LinearSVC]( + dataset.as[LabeledPoint], estimator, modelEquals) + MLTestingUtils.testOutliersWithSmallWeights[LinearSVCModel, LinearSVC]( + dataset.as[LabeledPoint], estimator, 2, modelEquals, outlierRatio = 3) + MLTestingUtils.testOversamplingVsWeighting[LinearSVCModel, LinearSVC]( + dataset.as[LabeledPoint], estimator, modelEquals, 42L) + } } - test("linearSVC comparison with R e1071 and scikit-learn") { - val trainer1 = new LinearSVC() + test("linearSVC OWLQN comparison with R e1071 and scikit-learn") { + val trainer1 = new LinearSVC().setOptimizer("owlqn") .setRegParam(0.00002) // set regParam = 2.0 / datasize / c .setMaxIter(200) .setTol(1e-4) @@ -223,6 +229,25 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model1.coefficients ~== coefficientsSK relTol 4E-3) } + test("linearSVC LBFGS comparison with R e1071 and scikit-learn") { + val trainer1 = new LinearSVC().setOptimizer("LBFGS") + .setRegParam(0.00003) + .setMaxIter(200) + .setTol(1e-4) + val model1 = trainer1.fit(binaryDataset) + + // refer to last unit test for R and python code + val coefficientsR = Vectors.dense(7.310338, 14.89741, 22.21005, 29.83508) + val interceptR = 7.440177 + assert(model1.intercept ~== interceptR relTol 2E-2) + assert(model1.coefficients ~== coefficientsR relTol 1E-2) + + val coefficientsSK = Vectors.dense(7.24690165, 14.77029087, 21.99924004, 29.5575729) + val interceptSK = 7.36947518 + assert(model1.intercept ~== interceptSK relTol 1E-2) + assert(model1.coefficients ~== coefficientsSK relTol 1E-2) + } + test("read/write: SVM") { def checkModelData(model: LinearSVCModel, model2: LinearSVCModel): Unit = { assert(model.intercept === model2.intercept) From f7d555997d8c589585b5a34a0b017a29577cad82 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Fri, 5 May 2017 19:25:54 -0700 Subject: [PATCH 02/14] set owlqn as default --- .../spark/ml/classification/LinearSVC.scala | 37 +++++++------------ .../ml/classification/LinearSVCSuite.scala | 14 +++---- 2 files changed, 20 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 863a2c6e27c35..918cf36d70b1d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -45,21 +45,7 @@ import org.apache.spark.sql.functions.{col, lit} /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasThreshold with HasAggregationDepth { - - /** - * The optimization algorithm for LinearSVC. - * Supported options: "lbfgs" and "owlqn". - * (default: "lbfgs") - * @group param - */ - final val optimizer: Param[String] = new Param[String](this, "optimizer", "The optimization" + - " algorithm to be used", ParamValidators.inArray[String](LinearSVC.supportedOptimizers)) - - /** @group getParam */ - final def getOptimizer: String = $(optimizer) - -} + with HasThreshold with HasAggregationDepth with HasSolver /** * :: Experimental :: @@ -165,13 +151,16 @@ class LinearSVC @Since("2.2.0") ( setDefault(aggregationDepth -> 2) /** - * Set optimizer for LinearSVC. Supported options: "lbfgs" and "owlqn". - * + * Set solver for LinearSVC. Supported options: "l-bfgs" and "owlqn" (case insensitve). + * - "l-bfgs" denotes Limited-memory BFGS which is a limited-memory quasi-Newton + * optimization method. + * - "owlqn" denotes Orthant-Wise Limited-memory Quasi-Newton algorithm . + * (default: "owlqn") * @group setParam */ @Since("2.2.0") - def setOptimizer(value: String): this.type = set(optimizer, value.toLowerCase(Locale.ROOT)) - setDefault(optimizer -> "lbfgs") + def setSolver(value: String): this.type = set(solver, value.toLowerCase(Locale.ROOT)) + setDefault(solver -> "owlqn") @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) @@ -233,21 +222,21 @@ class LinearSVC @Since("2.2.0") ( val costFun = new LinearSVCCostFun(instances, $(fitIntercept), $(standardization), bcFeaturesStd, regParamL2, $(aggregationDepth)) - val optimizerAlgo = $(optimizer) match { + val optimizer = $(solver) match { case LBFGS => new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) case OWLQN => def regParamL1Fun = (index: Int) => 0D new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) - case _ => throw new SparkException ("unexpected optimizer: " + $(optimizer)) + case _ => throw new SparkException ("unexpected optimizer: " + $(solver)) } val initialCoefWithIntercept = Vectors.zeros(numFeaturesPlusIntercept) - val states = optimizerAlgo.iterations(new CachedDiffFunction(costFun), + val states = optimizer.iterations(new CachedDiffFunction(costFun), initialCoefWithIntercept.asBreeze.toDenseVector) val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double] - var state: optimizerAlgo.State = null + var state: optimizer.State = null while (states.hasNext) { state = states.next() scaledObjectiveHistory += state.adjustedValue @@ -293,7 +282,7 @@ class LinearSVC @Since("2.2.0") ( object LinearSVC extends DefaultParamsReadable[LinearSVC] { /** String name for Limited-memory BFGS. */ - private[classification] val LBFGS: String = "lbfgs".toLowerCase(Locale.ROOT) + private[classification] val LBFGS: String = "l-bfgs".toLowerCase(Locale.ROOT) /** String name for Orthant-Wise Limited-memory Quasi-Newton. */ private[classification] val OWLQN: String = "owlqn".toLowerCase(Locale.ROOT) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index e7d623f789254..1b1257695fe0d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -76,7 +76,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau test("Linear SVC binary classification") { LinearSVC.supportedOptimizers.foreach { opt => - val svm = new LinearSVC().setOptimizer(opt) + val svm = new LinearSVC().setSolver(opt) val model = svm.fit(smallBinaryDataset) assert(model.transform(smallValidationDataset) .where("prediction=label").count() > nPoints * 0.8) @@ -87,7 +87,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau test("Linear SVC binary classification with regularization") { LinearSVC.supportedOptimizers.foreach { opt => - val svm = new LinearSVC().setOptimizer(opt).setMaxIter(10) + val svm = new LinearSVC().setSolver(opt).setMaxIter(10) val model = svm.setRegParam(0.1).fit(smallBinaryDataset) assert(model.transform(smallValidationDataset) .where("prediction=label").count() > nPoints * 0.8) @@ -116,7 +116,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(lsvc.getFeaturesCol === "features") assert(lsvc.getPredictionCol === "prediction") assert(lsvc.getRawPredictionCol === "rawPrediction") - assert(lsvc.getOptimizer === "lbfgs") + assert(lsvc.getSolver === "owlqn") val model = lsvc.setMaxIter(5).fit(smallBinaryDataset) model.transform(smallBinaryDataset) .select("label", "prediction", "rawPrediction") @@ -163,7 +163,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(m1.intercept ~== m2.intercept absTol 0.05) } LinearSVC.supportedOptimizers.foreach { opt => - val estimator = new LinearSVC().setRegParam(0.02).setTol(0.01).setOptimizer(opt) + val estimator = new LinearSVC().setRegParam(0.02).setTol(0.01).setSolver(opt) val dataset = smallBinaryDataset MLTestingUtils.testArbitrarilyScaledWeights[LinearSVCModel, LinearSVC]( dataset.as[LabeledPoint], estimator, modelEquals) @@ -175,7 +175,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau } test("linearSVC OWLQN comparison with R e1071 and scikit-learn") { - val trainer1 = new LinearSVC().setOptimizer("owlqn") + val trainer1 = new LinearSVC().setSolver(LinearSVC.OWLQN) .setRegParam(0.00002) // set regParam = 2.0 / datasize / c .setMaxIter(200) .setTol(1e-4) @@ -229,8 +229,8 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model1.coefficients ~== coefficientsSK relTol 4E-3) } - test("linearSVC LBFGS comparison with R e1071 and scikit-learn") { - val trainer1 = new LinearSVC().setOptimizer("LBFGS") + test("linearSVC L-BFGS comparison with R e1071 and scikit-learn") { + val trainer1 = new LinearSVC().setSolver(LinearSVC.LBFGS) .setRegParam(0.00003) .setMaxIter(200) .setTol(1e-4) From 8a7c10f5bc0d7234ed6e156c98f04bddb7a37204 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Mon, 8 May 2017 23:03:21 -0700 Subject: [PATCH 03/14] set check --- .../org/apache/spark/ml/classification/LinearSVC.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 918cf36d70b1d..e37cd0f01b3be 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -159,7 +159,12 @@ class LinearSVC @Since("2.2.0") ( * @group setParam */ @Since("2.2.0") - def setSolver(value: String): this.type = set(solver, value.toLowerCase(Locale.ROOT)) + def setSolver(value: String): this.type = { + val lowercaseValue = value.toLowerCase(Locale.ROOT) + require(supportedOptimizers.contains(lowercaseValue), + s"Solver $value was not supported. Supported options: l-bfgs, owlqn") + set(solver, lowercaseValue) + } setDefault(solver -> "owlqn") @Since("2.2.0") From 3707580de4b3905f2f77d53c926df14cda32f942 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 13 Jun 2017 11:22:52 -0700 Subject: [PATCH 04/14] merge loss change --- .../spark/ml/classification/LinearSVC.scala | 79 +++++++++++++---- .../ml/classification/LinearSVCSuite.scala | 85 ++++++++++++++++--- 2 files changed, 138 insertions(+), 26 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 1223dab54a22d..9ec8d501636b2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -45,7 +45,28 @@ import org.apache.spark.sql.functions.{col, lit} /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasThreshold with HasAggregationDepth with HasSolver + with HasThreshold with HasAggregationDepth with HasSolver { + + /** + * Specifies the loss function. Currently "hinge" and "squared_hinge" are supported. + * "hinge" is the standard SVM loss (a.k.a. L1 loss) while "squared_hinge" is the square of + * the hinge loss (a.k.a. L2 loss). + * + * @see Hinge loss (Wikipedia) + * + * @group param + */ + @Since("2.3.0") + final val loss: Param[String] = new Param(this, "loss", "Specifies the loss " + + "function. hinge is the standard SVM loss while squared_hinge is the square of the hinge loss.", + (s: String) => LinearSVC.supportedLoss.contains(s.toLowerCase(Locale.ROOT))) + + setDefault(loss -> "squared_hinge") + + /** @group getParam */ + @Since("2.3.0") + def getLoss: String = $(loss) +} /** * :: Experimental :: @@ -53,8 +74,11 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR * * Linear SVM Classifier * - * This binary classifier optimizes the Hinge Loss using the OWLQN optimizer. - * Only supports L2 regularization currently. + * This binary classifier implements a linear SVM classifier. Currently "hinge" and + * "squared_hinge" loss functions are supported. "hinge" is the standard SVM loss (a.k.a. L1 loss) + * while "squared_hinge" is the square of the hinge loss (a.k.a. L2 loss). Both LBFGS and OWL-QN + * optimizers are supported and can be specified via setting the solver param. + * By default, L2 SVM (Squared Hinge Loss) and L-BFGS optimizer are used. * */ @Since("2.2.0") @@ -151,6 +175,14 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) + /** + * Set the loss function. Default is "squared_hinge". + * + * @group setParam + */ + @Since("2.3.0") + def setLoss(value: String): this.type = set(loss, value) + /** * Set solver for LinearSVC. Supported options: "l-bfgs" and "owlqn" (case insensitve). * - "l-bfgs" denotes Limited-memory BFGS which is a limited-memory quasi-Newton @@ -166,7 +198,7 @@ class LinearSVC @Since("2.2.0") ( s"Solver $value was not supported. Supported options: l-bfgs, owlqn") set(solver, lowercaseValue) } - setDefault(solver -> "owlqn") + setDefault(solver -> "l-bfgs") @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) @@ -225,10 +257,10 @@ class LinearSVC @Since("2.2.0") ( val featuresStd = summarizer.variance.toArray.map(math.sqrt) val regParamL2 = $(regParam) val bcFeaturesStd = instances.context.broadcast(featuresStd) - val costFun = new LinearSVCCostFun(instances, $(fitIntercept), - $(standardization), bcFeaturesStd, regParamL2, $(aggregationDepth)) + val costFun = new LinearSVCCostFun(instances, $(fitIntercept), $(standardization), + bcFeaturesStd, regParamL2, $(aggregationDepth), $(loss).toLowerCase(Locale.ROOT)) - val optimizer = $(solver) match { + val optimizer = $(solver).toLowerCase(Locale.ROOT) match { case LBFGS => new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) case OWLQN => def regParamL1Fun = (index: Int) => 0D @@ -298,6 +330,8 @@ object LinearSVC extends DefaultParamsReadable[LinearSVC] { @Since("2.2.0") override def load(path: String): LinearSVC = super.load(path) + + private[classification] val supportedLoss = Array("hinge", "squared_hinge") } /** @@ -393,7 +427,8 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] { } /** - * LinearSVCCostFun implements Breeze's DiffFunction[T] for hinge loss function + * LinearSVCCostFun implements Breeze's DiffFunction[T] for loss function ("hinge" or + * "squared_hinge"). */ private class LinearSVCCostFun( instances: RDD[Instance], @@ -401,7 +436,8 @@ private class LinearSVCCostFun( standardization: Boolean, bcFeaturesStd: Broadcast[Array[Double]], regParamL2: Double, - aggregationDepth: Int) extends DiffFunction[BDV[Double]] { + aggregationDepth: Int, + loss: String) extends DiffFunction[BDV[Double]] { override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { val coeffs = Vectors.fromBreeze(coefficients) @@ -414,7 +450,7 @@ private class LinearSVCCostFun( val combOp = (c1: LinearSVCAggregator, c2: LinearSVCAggregator) => c1.merge(c2) instances.treeAggregate( - new LinearSVCAggregator(bcCoeffs, bcFeaturesStd, fitIntercept) + new LinearSVCAggregator(bcCoeffs, bcFeaturesStd, fitIntercept, loss) )(seqOp, combOp, aggregationDepth) } @@ -459,8 +495,9 @@ private class LinearSVCCostFun( } /** - * LinearSVCAggregator computes the gradient and loss for hinge loss function, as used - * in binary classification for instances in sparse or dense vector in an online fashion. + * LinearSVCAggregator computes the gradient and loss for loss function ("hinge" or + * "squared_hinge", as used in binary classification for instances in sparse or dense + * vector in an online fashion. * * Two LinearSVCAggregator can be merged together to have a summary of loss and gradient of * the corresponding joint dataset. @@ -474,7 +511,8 @@ private class LinearSVCCostFun( private class LinearSVCAggregator( bcCoefficients: Broadcast[Vector], bcFeaturesStd: Broadcast[Array[Double]], - fitIntercept: Boolean) extends Serializable { + fitIntercept: Boolean, + lossFunction: String) extends Serializable { private val numFeatures: Int = bcFeaturesStd.value.length private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures @@ -516,13 +554,24 @@ private class LinearSVCAggregator( // Therefore the gradient is -(2y - 1)*x val labelScaled = 2 * label - 1.0 val loss = if (1.0 > labelScaled * dotProduct) { - weight * (1.0 - labelScaled * dotProduct) + val hingeLoss = 1.0 - labelScaled * dotProduct + lossFunction match { + case "hinge" => hingeLoss * weight + case "squared_hinge" => hingeLoss * hingeLoss * weight + case unexpected => throw new SparkException( + s"unexpected lossFunction in LinearSVCAggregator: $unexpected") + } } else { 0.0 } if (1.0 > labelScaled * dotProduct) { - val gradientScale = -labelScaled * weight + val gradientScale = lossFunction match { + case "hinge" => -labelScaled * weight + case "squared_hinge" => (labelScaled * dotProduct - 1) * labelScaled * 2 + case unexpected => throw new SparkException( + s"unexpected lossFunction in LinearSVCAggregator: $unexpected") + } features.foreachActive { (index, value) => if (localFeaturesStd(index) != 0.0 && value != 0.0) { localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index 1b1257695fe0d..b4ac72da5e1ea 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -76,12 +76,14 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau test("Linear SVC binary classification") { LinearSVC.supportedOptimizers.foreach { opt => - val svm = new LinearSVC().setSolver(opt) - val model = svm.fit(smallBinaryDataset) - assert(model.transform(smallValidationDataset) - .where("prediction=label").count() > nPoints * 0.8) - val sparseModel = svm.fit(smallSparseBinaryDataset) - checkModels(model, sparseModel) + Array("hinge", "squared_hinge").foreach { loss => + val svm = new LinearSVC().setLoss(loss).setSolver(opt) + val model = svm.fit(smallBinaryDataset) + assert(model.transform(smallValidationDataset) + .where("prediction=label").count() > nPoints * 0.8) + val sparseModel = svm.fit(smallSparseBinaryDataset) + checkModels(model, sparseModel) + } } } @@ -106,6 +108,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val lsvc = new LinearSVC() assert(lsvc.getRegParam === 0.0) assert(lsvc.getMaxIter === 100) + assert(lsvc.getLoss === "squared_hinge") assert(lsvc.getFitIntercept) assert(lsvc.getTol === 1E-6) assert(lsvc.getStandardization) @@ -116,11 +119,12 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(lsvc.getFeaturesCol === "features") assert(lsvc.getPredictionCol === "prediction") assert(lsvc.getRawPredictionCol === "rawPrediction") - assert(lsvc.getSolver === "owlqn") + assert(lsvc.getSolver === "l-bfgs") val model = lsvc.setMaxIter(5).fit(smallBinaryDataset) model.transform(smallBinaryDataset) .select("label", "prediction", "rawPrediction") .collect() + assert(model.getLoss === "squared_hinge") assert(model.getThreshold === 0.0) assert(model.getFeaturesCol === "features") assert(model.getPredictionCol === "prediction") @@ -130,6 +134,13 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model.numFeatures === 2) MLTestingUtils.checkCopyAndUids(lsvc, model) + withClue("lossFunction should be case-insensitive") { + lsvc.setLoss("HINGE") + lsvc.setLoss("Squared_hinge") + intercept[IllegalArgumentException] { + val model = lsvc.setLoss("hing") + } + } } test("linear svc doesn't fit intercept when fitIntercept is off") { @@ -145,7 +156,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau test("sparse coefficients in SVCAggregator") { val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val agg = new LinearSVCAggregator(bcCoefficients, bcFeaturesStd, true) + val agg = new LinearSVCAggregator(bcCoefficients, bcFeaturesStd, true, "squared_hinge") val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) @@ -164,6 +175,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau } LinearSVC.supportedOptimizers.foreach { opt => val estimator = new LinearSVC().setRegParam(0.02).setTol(0.01).setSolver(opt) + .setLoss("hinge") val dataset = smallBinaryDataset MLTestingUtils.testArbitrarilyScaledWeights[LinearSVCModel, LinearSVC]( dataset.as[LabeledPoint], estimator, modelEquals) @@ -174,11 +186,12 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau } } - test("linearSVC OWLQN comparison with R e1071 and scikit-learn") { + test("linearSVC OWLQN hinge comparison with R e1071 and scikit-learn") { val trainer1 = new LinearSVC().setSolver(LinearSVC.OWLQN) .setRegParam(0.00002) // set regParam = 2.0 / datasize / c .setMaxIter(200) .setTol(1e-4) + .setLoss("hinge") val model1 = trainer1.fit(binaryDataset) /* @@ -229,11 +242,12 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model1.coefficients ~== coefficientsSK relTol 4E-3) } - test("linearSVC L-BFGS comparison with R e1071 and scikit-learn") { + test("linearSVC L-BFGS hinge comparison with R e1071 and scikit-learn") { val trainer1 = new LinearSVC().setSolver(LinearSVC.LBFGS) .setRegParam(0.00003) .setMaxIter(200) .setTol(1e-4) + .setLoss("hinge") val model1 = trainer1.fit(binaryDataset) // refer to last unit test for R and python code @@ -248,6 +262,53 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model1.coefficients ~== coefficientsSK relTol 1E-2) } + test("linearSVC OWLQN squared_hinge loss comparison with scikit-learn (liblinear)") { + val linearSVC = new LinearSVC() + .setLoss("squared_hinge") + .setSolver("owlqn") + .setRegParam(2.0 / 10 / 1000) // set regParam = 2.0 / datasize / c + .setMaxIter(80) + .setTol(1e-4) + val model = linearSVC.fit(binaryDataset.limit(1000)) + + /* + Use the following python code to load the data and train the model using scikit-learn package. + import numpy as np + from sklearn import svm + f = open("path/spark/assembly/target/tmp/LinearSVC/binaryDataset/part-00000") + data = np.loadtxt(f, delimiter=",")[:1000] + X = data[:, 1:] # select columns 1 through end + y = data[:, 0] # select column 0 as label + clf = svm.LinearSVC(fit_intercept=True, C=10, loss='squared_hinge', tol=1e-4, random_state=42) + m = clf.fit(X, y) + print m.coef_ + print m.intercept_ + [[ 2.85136074 6.25310456 9.00668415 12.17750981]] + [ 2.93419973] + */ + + val coefficientsSK = Vectors.dense(2.85136074, 6.25310456, 9.00668415, 12.17750981) + val interceptSK = 2.93419973 + assert(model.intercept ~== interceptSK relTol 2E-2) + assert(model.coefficients ~== coefficientsSK relTol 2E-2) + } + + test("linearSVC L-BFGS squared_hinge loss comparison with scikit-learn (liblinear)") { + val linearSVC = new LinearSVC() + .setLoss("squared_hinge") + .setSolver("L-BFGS") + .setRegParam(3.0 / 10 / 1000) // set regParam = 2.0 / datasize / c + .setMaxIter(30) + .setTol(1e-4) + val model = linearSVC.fit(binaryDataset.limit(1000)) + + // refer to last unit test for python code + val coefficientsSK = Vectors.dense(2.85136074, 6.25310456, 9.00668415, 12.17750981) + val interceptSK = 2.93419973 + assert(model.intercept ~== interceptSK relTol 3E-2) + assert(model.coefficients ~== coefficientsSK relTol 3E-2) + } + test("read/write: SVM") { def checkModelData(model: LinearSVCModel, model2: LinearSVCModel): Unit = { assert(model.intercept === model2.intercept) @@ -263,6 +324,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau object LinearSVCSuite { val allParamSettings: Map[String, Any] = Map( + "loss" -> "squared_hinge", "regParam" -> 0.01, "maxIter" -> 2, // intentionally small "fitIntercept" -> true, @@ -271,7 +333,8 @@ object LinearSVCSuite { "threshold" -> 0.6, "predictionCol" -> "myPredict", "rawPredictionCol" -> "myRawPredict", - "aggregationDepth" -> 3 + "aggregationDepth" -> 3, + "solver" -> "owlqn" ) // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) From 2ca5a7456f7dc5ea4473ddee2933bd6228b3476e Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 13 Jun 2017 18:01:10 -0700 Subject: [PATCH 05/14] fix r and python --- R/pkg/R/mllib_classification.R | 8 ++++++-- R/pkg/tests/fulltests/test_mllib_classification.R | 3 ++- .../scala/org/apache/spark/ml/r/LinearSVCWrapper.scala | 8 ++++++-- python/pyspark/ml/classification.py | 6 +++--- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/R/pkg/R/mllib_classification.R b/R/pkg/R/mllib_classification.R index 306a9b8676539..ed18b9e9336d4 100644 --- a/R/pkg/R/mllib_classification.R +++ b/R/pkg/R/mllib_classification.R @@ -58,6 +58,8 @@ setClass("NaiveBayesModel", representation(jobj = "jobj")) #' @param regParam The regularization parameter. Only supports L2 regularization currently. #' @param maxIter Maximum iteration number. #' @param tol Convergence tolerance of iterations. +#' @param solver solver parameter, supported options: "owlqn" or "l-bfgs". +#' @param loss loss function, supported options: "hinge" and "squared_hinge". #' @param standardization Whether to standardize the training features before fitting the model. The coefficients #' of models will be always returned on the original scale, so it will be transparent for #' users. Note that with/without standardization, the models should be always converged @@ -96,7 +98,8 @@ setClass("NaiveBayesModel", representation(jobj = "jobj")) #' @note spark.svmLinear since 2.2.0 setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, regParam = 0.0, maxIter = 100, tol = 1E-6, standardization = TRUE, - threshold = 0.0, weightCol = NULL, aggregationDepth = 2) { + threshold = 0.0, weightCol = NULL, aggregationDepth = 2, solver = "l-bfgs", + loss = "squared_hinge") { formula <- paste(deparse(formula), collapse = "") if (!is.null(weightCol) && weightCol == "") { @@ -108,7 +111,8 @@ setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formu jobj <- callJStatic("org.apache.spark.ml.r.LinearSVCWrapper", "fit", data@sdf, formula, as.numeric(regParam), as.integer(maxIter), as.numeric(tol), as.logical(standardization), as.numeric(threshold), - weightCol, as.integer(aggregationDepth)) + weightCol, as.integer(aggregationDepth), as.character(solver), + as.character(loss)) new("LinearSVCModel", jobj = jobj) }) diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R b/R/pkg/tests/fulltests/test_mllib_classification.R index 726e9d9a20b1c..fff7dd674b571 100644 --- a/R/pkg/tests/fulltests/test_mllib_classification.R +++ b/R/pkg/tests/fulltests/test_mllib_classification.R @@ -30,7 +30,8 @@ absoluteSparkPath <- function(x) { test_that("spark.svmLinear", { df <- suppressWarnings(createDataFrame(iris)) training <- df[df$Species %in% c("versicolor", "virginica"), ] - model <- spark.svmLinear(training, Species ~ ., regParam = 0.01, maxIter = 10) + model <- spark.svmLinear(training, Species ~ ., regParam = 0.01, maxIter = 10, + loss = "hinge", solver = "owlqn") summary <- summary(model) # test summary coefficients return matrix type diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala index 0dd1f1146fbf8..929db1e4d3dbf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala @@ -70,7 +70,7 @@ private[r] object LinearSVCWrapper val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" val PREDICTED_LABEL_COL = "prediction" - def fit( + def fit( // scalastyle:ignore data: DataFrame, formula: String, regParam: Double, @@ -79,7 +79,9 @@ private[r] object LinearSVCWrapper standardization: Boolean, threshold: Double, weightCol: String, - aggregationDepth: Int + aggregationDepth: Int, + solver: String, + loss: String ): LinearSVCWrapper = { val rFormula = new RFormula() @@ -105,6 +107,8 @@ private[r] object LinearSVCWrapper .setPredictionCol(PREDICTED_LABEL_INDEX_COL) .setThreshold(threshold) .setAggregationDepth(aggregationDepth) + .setSolver(solver) + .setLoss(loss) if (weightCol != null) svc.setWeightCol(weightCol) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 60bdeedd6a144..b88fef6e889ae 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -80,9 +80,9 @@ class LinearSVC(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, Ha >>> svm = LinearSVC(maxIter=5, regParam=0.01) >>> model = svm.fit(df) >>> model.coefficients - DenseVector([0.0, -0.2792, -0.1833]) + DenseVector([0.0, 0.0759, -0.6167]) >>> model.intercept - 1.0206118982229047 + 1.3113904822325306 >>> model.numClasses 2 >>> model.numFeatures @@ -92,7 +92,7 @@ class LinearSVC(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, Ha >>> result.prediction 1.0 >>> result.rawPrediction - DenseVector([-1.4831, 1.4831]) + DenseVector([-1.8521, 1.8521]) >>> svm_path = temp_path + "/svm" >>> svm.save(svm_path) >>> svm2 = LinearSVC.load(svm_path) From 5f7f456335b02f1408f0d1577bdbbc3963312233 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 13 Jun 2017 21:28:47 -0700 Subject: [PATCH 06/14] switch between Hinge and Square --- .../spark/ml/classification/LinearSVC.scala | 158 ++---------------- .../ml/optim/aggregator/HingeAggregator.scala | 103 ++++++++++++ .../aggregator/SquaredHingeAggregator.scala | 104 ++++++++++++ .../ml/classification/LinearSVCSuite.scala | 3 +- 4 files changed, 226 insertions(+), 142 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala create mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 9ec8d501636b2..b0450abd416a4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -32,7 +32,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ -import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.optim.aggregator.{HingeAggregator, SquaredHingeAggregator} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -445,13 +445,22 @@ private class LinearSVCCostFun( val featuresStd = bcFeaturesStd.value val numFeatures = featuresStd.length - val svmAggregator = { - val seqOp = (c: LinearSVCAggregator, instance: Instance) => c.add(instance) - val combOp = (c1: LinearSVCAggregator, c2: LinearSVCAggregator) => c1.merge(c2) - - instances.treeAggregate( - new LinearSVCAggregator(bcCoeffs, bcFeaturesStd, fitIntercept, loss) - )(seqOp, combOp, aggregationDepth) + val svmAggregator = loss match { + case "hinge" => + val seqOp = (c: HingeAggregator, instance: Instance) => c.add(instance) + val combOp = (c1: HingeAggregator, c2: HingeAggregator) => c1.merge(c2) + instances.treeAggregate( + new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoeffs) + )(seqOp, combOp, aggregationDepth) + case "squared_hinge" => + val seqOp = (c: SquaredHingeAggregator, instance: Instance) => c.add(instance) + val combOp = (c1: SquaredHingeAggregator, c2: SquaredHingeAggregator) => c1.merge(c2) + + instances.treeAggregate( + new SquaredHingeAggregator(bcFeaturesStd, fitIntercept)(bcCoeffs) + )(seqOp, combOp, aggregationDepth) + case unexpected => throw new SparkException( + s"unexpected lossFunction in LinearSVCAggregator: $unexpected") } val totalGradientArray = svmAggregator.gradient.toArray @@ -493,136 +502,3 @@ private class LinearSVCCostFun( (svmAggregator.loss + regVal, new BDV(totalGradientArray)) } } - -/** - * LinearSVCAggregator computes the gradient and loss for loss function ("hinge" or - * "squared_hinge", as used in binary classification for instances in sparse or dense - * vector in an online fashion. - * - * Two LinearSVCAggregator can be merged together to have a summary of loss and gradient of - * the corresponding joint dataset. - * - * This class standardizes feature values during computation using bcFeaturesStd. - * - * @param bcCoefficients The coefficients corresponding to the features. - * @param fitIntercept Whether to fit an intercept term. - * @param bcFeaturesStd The standard deviation values of the features. - */ -private class LinearSVCAggregator( - bcCoefficients: Broadcast[Vector], - bcFeaturesStd: Broadcast[Array[Double]], - fitIntercept: Boolean, - lossFunction: String) extends Serializable { - - private val numFeatures: Int = bcFeaturesStd.value.length - private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures - private var weightSum: Double = 0.0 - private var lossSum: Double = 0.0 - @transient private lazy val coefficientsArray = bcCoefficients.value match { - case DenseVector(values) => values - case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + - s" but got type ${bcCoefficients.value.getClass}.") - } - private lazy val gradientSumArray = new Array[Double](numFeaturesPlusIntercept) - - /** - * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient - * of the objective function. - * - * @param instance The instance of data point to be added. - * @return This LinearSVCAggregator object. - */ - def add(instance: Instance): this.type = { - instance match { case Instance(label, weight, features) => - - if (weight == 0.0) return this - val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = coefficientsArray - val localGradientSumArray = gradientSumArray - - val dotProduct = { - var sum = 0.0 - features.foreachActive { (index, value) => - if (localFeaturesStd(index) != 0.0 && value != 0.0) { - sum += localCoefficients(index) * value / localFeaturesStd(index) - } - } - if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) - sum - } - // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) - // Therefore the gradient is -(2y - 1)*x - val labelScaled = 2 * label - 1.0 - val loss = if (1.0 > labelScaled * dotProduct) { - val hingeLoss = 1.0 - labelScaled * dotProduct - lossFunction match { - case "hinge" => hingeLoss * weight - case "squared_hinge" => hingeLoss * hingeLoss * weight - case unexpected => throw new SparkException( - s"unexpected lossFunction in LinearSVCAggregator: $unexpected") - } - } else { - 0.0 - } - - if (1.0 > labelScaled * dotProduct) { - val gradientScale = lossFunction match { - case "hinge" => -labelScaled * weight - case "squared_hinge" => (labelScaled * dotProduct - 1) * labelScaled * 2 - case unexpected => throw new SparkException( - s"unexpected lossFunction in LinearSVCAggregator: $unexpected") - } - features.foreachActive { (index, value) => - if (localFeaturesStd(index) != 0.0 && value != 0.0) { - localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) - } - } - if (fitIntercept) { - localGradientSumArray(localGradientSumArray.length - 1) += gradientScale - } - } - - lossSum += loss - weightSum += weight - this - } - } - - /** - * Merge another LinearSVCAggregator, and update the loss and gradient - * of the objective function. - * (Note that it's in place merging; as a result, `this` object will be modified.) - * - * @param other The other LinearSVCAggregator to be merged. - * @return This LinearSVCAggregator object. - */ - def merge(other: LinearSVCAggregator): this.type = { - - if (other.weightSum != 0.0) { - weightSum += other.weightSum - lossSum += other.lossSum - - var i = 0 - val localThisGradientSumArray = this.gradientSumArray - val localOtherGradientSumArray = other.gradientSumArray - val len = localThisGradientSumArray.length - while (i < len) { - localThisGradientSumArray(i) += localOtherGradientSumArray(i) - i += 1 - } - } - this - } - - def loss: Double = if (weightSum != 0) lossSum / weightSum else 0.0 - - def gradient: Vector = { - if (weightSum != 0) { - val result = Vectors.dense(gradientSumArray.clone()) - scal(1.0 / weightSum, result) - result - } else { - Vectors.dense(new Array[Double](numFeaturesPlusIntercept)) - } - } -} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala new file mode 100644 index 0000000000000..5e0b9827dc45d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg._ + +/** + * LinearSVCAggregator computes the gradient and loss for loss function ("hinge" or + * "squared_hinge", as used in binary classification for instances in sparse or dense + * vector in an online fashion. + * + * Two LinearSVCAggregator can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * This class standardizes feature values during computation using bcFeaturesStd. + * + * @param bcCoefficients The coefficients corresponding to the features. + * @param fitIntercept Whether to fit an intercept term. + * @param bcFeaturesStd The standard deviation values of the features. + */ +private[ml] class HingeAggregator( + bcFeaturesStd: Broadcast[Array[Double]], + fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[Instance, HingeAggregator] { + + private val numFeatures: Int = bcFeaturesStd.value.length + private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures + @transient private lazy val coefficientsArray = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + + s" but got type ${bcCoefficients.value.getClass}.") + } + protected override val dim: Int = numFeaturesPlusIntercept + + /** + * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient + * of the objective function. + * + * @param instance The instance of data point to be added. + * @return This LinearSVCAggregator object. + */ + def add(instance: Instance): this.type = { + instance match { case Instance(label, weight, features) => + + if (weight == 0.0) return this + val localFeaturesStd = bcFeaturesStd.value + val localCoefficients = coefficientsArray + val localGradientSumArray = gradientSumArray + + val dotProduct = { + var sum = 0.0 + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + sum += localCoefficients(index) * value / localFeaturesStd(index) + } + } + if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) + sum + } + // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val labelScaled = 2 * label - 1.0 + val loss = if (1.0 > labelScaled * dotProduct) { + (1.0 - labelScaled * dotProduct) * weight + } else { + 0.0 + } + + if (1.0 > labelScaled * dotProduct) { + val gradientScale = -labelScaled * weight + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) + } + } + if (fitIntercept) { + localGradientSumArray(localGradientSumArray.length - 1) += gradientScale + } + } + + lossSum += loss + weightSum += weight + this + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala new file mode 100644 index 0000000000000..5063a9d59c5ce --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg._ + +/** + * LinearSVCAggregator computes the gradient and loss for loss function ("hinge" or + * "squared_hinge", as used in binary classification for instances in sparse or dense + * vector in an online fashion. + * + * Two LinearSVCAggregator can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * This class standardizes feature values during computation using bcFeaturesStd. + * + * @param bcCoefficients The coefficients corresponding to the features. + * @param fitIntercept Whether to fit an intercept term. + * @param bcFeaturesStd The standard deviation values of the features. + */ +private[ml] class SquaredHingeAggregator( + bcFeaturesStd: Broadcast[Array[Double]], + fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[Instance, SquaredHingeAggregator] { + + private val numFeatures: Int = bcFeaturesStd.value.length + private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures + @transient private lazy val coefficientsArray = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + + s" but got type ${bcCoefficients.value.getClass}.") + } + protected override val dim: Int = numFeaturesPlusIntercept + + /** + * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient + * of the objective function. + * + * @param instance The instance of data point to be added. + * @return This LinearSVCAggregator object. + */ + def add(instance: Instance): this.type = { + instance match { case Instance(label, weight, features) => + + if (weight == 0.0) return this + val localFeaturesStd = bcFeaturesStd.value + val localCoefficients = coefficientsArray + val localGradientSumArray = gradientSumArray + + val dotProduct = { + var sum = 0.0 + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + sum += localCoefficients(index) * value / localFeaturesStd(index) + } + } + if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) + sum + } + // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val labelScaled = 2 * label - 1.0 + val loss = if (1.0 > labelScaled * dotProduct) { + val hingeLoss = 1.0 - labelScaled * dotProduct + hingeLoss * hingeLoss * weight + } else { + 0.0 + } + + if (1.0 > labelScaled * dotProduct) { + val gradientScale = (labelScaled * dotProduct - 1) * labelScaled * 2 + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) + } + } + if (fitIntercept) { + localGradientSumArray(localGradientSumArray.length - 1) += gradientScale + } + } + + lossSum += loss + weightSum += weight + this + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index b4ac72da5e1ea..1a80d7b6a162c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LinearSVCSuite._ import org.apache.spark.ml.feature.{Instance, LabeledPoint} import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.ml.optim.aggregator.SquaredHingeAggregator import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ @@ -156,7 +157,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau test("sparse coefficients in SVCAggregator") { val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val agg = new LinearSVCAggregator(bcCoefficients, bcFeaturesStd, true, "squared_hinge") + val agg = new SquaredHingeAggregator(bcFeaturesStd, true)(bcCoefficients) val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) From 0297057c50ed673048f8d92acbad1d03a7f7fc88 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 15 Jun 2017 12:26:22 -0700 Subject: [PATCH 07/14] use RDDLossFunction --- .../spark/ml/classification/LinearSVC.scala | 104 ++++-------------- 1 file changed, 23 insertions(+), 81 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index b0450abd416a4..d2be7ce53c9f5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -22,17 +22,16 @@ import java.util.Locale import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} -import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, - OWLQN => BreezeOWLQN} +import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} -import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator.{HingeAggregator, SquaredHingeAggregator} +import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -257,8 +256,27 @@ class LinearSVC @Since("2.2.0") ( val featuresStd = summarizer.variance.toArray.map(math.sqrt) val regParamL2 = $(regParam) val bcFeaturesStd = instances.context.broadcast(featuresStd) - val costFun = new LinearSVCCostFun(instances, $(fitIntercept), $(standardization), - bcFeaturesStd, regParamL2, $(aggregationDepth), $(loss).toLowerCase(Locale.ROOT)) + + val regularization = if (regParamL2 != 0.0) { + val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures + Some(new L2Regularization(regParamL2, shouldApply, + if ($(standardization)) None else Some(featuresStd))) + } else { + None + } + + val costFun = $(loss) match { + case "hinge" => + val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) + new RDDLossFunction(instances, getAggregatorFunc, regularization, + $(aggregationDepth)) + case "squared_hinge" => + val getAggregatorFunc = new SquaredHingeAggregator(bcFeaturesStd, $(fitIntercept))(_) + new RDDLossFunction(instances, getAggregatorFunc, regularization, + $(aggregationDepth)) + case unexpected => throw new SparkException( + s"unexpected lossFunction in LinearSVCAggregator: $unexpected") + } val optimizer = $(solver).toLowerCase(Locale.ROOT) match { case LBFGS => new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) @@ -426,79 +444,3 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] { } } -/** - * LinearSVCCostFun implements Breeze's DiffFunction[T] for loss function ("hinge" or - * "squared_hinge"). - */ -private class LinearSVCCostFun( - instances: RDD[Instance], - fitIntercept: Boolean, - standardization: Boolean, - bcFeaturesStd: Broadcast[Array[Double]], - regParamL2: Double, - aggregationDepth: Int, - loss: String) extends DiffFunction[BDV[Double]] { - - override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { - val coeffs = Vectors.fromBreeze(coefficients) - val bcCoeffs = instances.context.broadcast(coeffs) - val featuresStd = bcFeaturesStd.value - val numFeatures = featuresStd.length - - val svmAggregator = loss match { - case "hinge" => - val seqOp = (c: HingeAggregator, instance: Instance) => c.add(instance) - val combOp = (c1: HingeAggregator, c2: HingeAggregator) => c1.merge(c2) - instances.treeAggregate( - new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoeffs) - )(seqOp, combOp, aggregationDepth) - case "squared_hinge" => - val seqOp = (c: SquaredHingeAggregator, instance: Instance) => c.add(instance) - val combOp = (c1: SquaredHingeAggregator, c2: SquaredHingeAggregator) => c1.merge(c2) - - instances.treeAggregate( - new SquaredHingeAggregator(bcFeaturesStd, fitIntercept)(bcCoeffs) - )(seqOp, combOp, aggregationDepth) - case unexpected => throw new SparkException( - s"unexpected lossFunction in LinearSVCAggregator: $unexpected") - } - - val totalGradientArray = svmAggregator.gradient.toArray - // regVal is the sum of coefficients squares excluding intercept for L2 regularization. - val regVal = if (regParamL2 == 0.0) { - 0.0 - } else { - var sum = 0.0 - coeffs.foreachActive { case (index, value) => - // We do not apply regularization to the intercepts - if (index != numFeatures) { - // The following code will compute the loss of the regularization; also - // the gradient of the regularization, and add back to totalGradientArray. - sum += { - if (standardization) { - totalGradientArray(index) += regParamL2 * value - value * value - } else { - if (featuresStd(index) != 0.0) { - // If `standardization` is false, we still standardize the data - // to improve the rate of convergence; as a result, we have to - // perform this reverse standardization by penalizing each component - // differently to get effectively the same objective function when - // the training dataset is not standardized. - val temp = value / (featuresStd(index) * featuresStd(index)) - totalGradientArray(index) += regParamL2 * temp - value * temp - } else { - 0.0 - } - } - } - } - } - 0.5 * regParamL2 * sum - } - bcCoeffs.destroy(blocking = false) - - (svmAggregator.loss + regVal, new BDV(totalGradientArray)) - } -} From 7be6bacd19e06de6ea8f040f8ad0c70feee82e12 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 27 Jun 2017 16:46:34 -0700 Subject: [PATCH 08/14] r and new ut --- R/pkg/R/mllib_classification.R | 14 +-- .../fulltests/test_mllib_classification.R | 2 +- .../ml/classification/LinearSVCSuite.scala | 93 ++++++++++--------- 3 files changed, 60 insertions(+), 49 deletions(-) diff --git a/R/pkg/R/mllib_classification.R b/R/pkg/R/mllib_classification.R index e5688f6bc12df..929552664c10c 100644 --- a/R/pkg/R/mllib_classification.R +++ b/R/pkg/R/mllib_classification.R @@ -58,8 +58,8 @@ setClass("NaiveBayesModel", representation(jobj = "jobj")) #' @param regParam The regularization parameter. Only supports L2 regularization currently. #' @param maxIter Maximum iteration number. #' @param tol Convergence tolerance of iterations. -#' @param solver solver parameter, supported options: "owlqn" or "l-bfgs". -#' @param loss loss function, supported options: "hinge" and "squared_hinge". +#' @param solver solver parameter, supported options: "owlqn" or "l-bfgs". Default is "l-bfgs" +#' @param loss loss function, supported options: "hinge" and "squared_hinge". Default is "squared_hinge" #' @param standardization Whether to standardize the training features before fitting the model. The coefficients #' of models will be always returned on the original scale, so it will be transparent for #' users. Note that with/without standardization, the models should be always converged @@ -100,8 +100,11 @@ setClass("NaiveBayesModel", representation(jobj = "jobj")) #' @note spark.svmLinear since 2.2.0 setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, regParam = 0.0, maxIter = 100, tol = 1E-6, standardization = TRUE, - threshold = 0.0, weightCol = NULL, aggregationDepth = 2, solver = "l-bfgs", - loss = "squared_hinge") { + threshold = 0.0, weightCol = NULL, aggregationDepth = 2, + solver = c("l-bfgs", "owlqn"), loss = c("squared_hinge", "hinge")) { + + solver <- match.arg(solver) + loss <- match.arg(loss) formula <- paste(deparse(formula), collapse = "") if (!is.null(weightCol) && weightCol == "") { @@ -113,8 +116,7 @@ setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formu jobj <- callJStatic("org.apache.spark.ml.r.LinearSVCWrapper", "fit", data@sdf, formula, as.numeric(regParam), as.integer(maxIter), as.numeric(tol), as.logical(standardization), as.numeric(threshold), - weightCol, as.integer(aggregationDepth), as.character(solver), - as.character(loss)) + weightCol, as.integer(aggregationDepth), solver, loss) new("LinearSVCModel", jobj = jobj) }) diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R b/R/pkg/tests/fulltests/test_mllib_classification.R index 181ab0662dfa1..76baad547b271 100644 --- a/R/pkg/tests/fulltests/test_mllib_classification.R +++ b/R/pkg/tests/fulltests/test_mllib_classification.R @@ -31,7 +31,7 @@ test_that("spark.svmLinear", { df <- suppressWarnings(createDataFrame(iris)) training <- df[df$Species %in% c("versicolor", "virginica"), ] model <- spark.svmLinear(training, Species ~ ., regParam = 0.01, maxIter = 10, - loss = "hinge", solver = "owlqn") + solver = "owlqn", loss = "hinge") summary <- summary(model) # test summary coefficients return matrix type diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index a3fc4e57324f1..b8c664a746e26 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -221,12 +221,12 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau } test("linearSVC OWLQN hinge comparison with R e1071 and scikit-learn") { - val trainer1 = new LinearSVC().setSolver(LinearSVC.OWLQN) - .setRegParam(0.00002) // set regParam = 2.0 / datasize / c + val trainer = new LinearSVC().setSolver(LinearSVC.OWLQN) + .setRegParam(2.0 / 10 / 10000) // set regParam = 2.0 / datasize / c .setMaxIter(200) .setTol(1e-4) .setLoss("hinge") - val model1 = trainer1.fit(binaryDataset) + val model = trainer.fit(binaryDataset) /* Use the following R code to load the data and train the model using glmnet package. @@ -249,8 +249,8 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau */ val coefficientsR = Vectors.dense(7.310338, 14.89741, 22.21005, 29.83508) val interceptR = 7.440177 - assert(model1.intercept ~== interceptR relTol 1E-2) - assert(model1.coefficients ~== coefficientsR relTol 1E-2) + assert(model.intercept ~== interceptR relTol 1E-2) + assert(model.coefficients ~== coefficientsR relTol 1E-2) /* Use the following python code to load the data and train the model using scikit-learn package. @@ -272,36 +272,16 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val coefficientsSK = Vectors.dense(7.24690165, 14.77029087, 21.99924004, 29.5575729) val interceptSK = 7.36947518 - assert(model1.intercept ~== interceptSK relTol 1E-3) - assert(model1.coefficients ~== coefficientsSK relTol 4E-3) + assert(model.intercept ~== interceptSK relTol 1E-3) + assert(model.coefficients ~== coefficientsSK relTol 4E-3) } - test("linearSVC L-BFGS hinge comparison with R e1071 and scikit-learn") { - val trainer1 = new LinearSVC().setSolver(LinearSVC.LBFGS) - .setRegParam(0.00003) - .setMaxIter(200) - .setTol(1e-4) - .setLoss("hinge") - val model1 = trainer1.fit(binaryDataset) - - // refer to last unit test for R and python code - val coefficientsR = Vectors.dense(7.310338, 14.89741, 22.21005, 29.83508) - val interceptR = 7.440177 - assert(model1.intercept ~== interceptR relTol 2E-2) - assert(model1.coefficients ~== coefficientsR relTol 1E-2) - - val coefficientsSK = Vectors.dense(7.24690165, 14.77029087, 21.99924004, 29.5575729) - val interceptSK = 7.36947518 - assert(model1.intercept ~== interceptSK relTol 1E-2) - assert(model1.coefficients ~== coefficientsSK relTol 1E-2) - } - - test("linearSVC OWLQN squared_hinge loss comparison with scikit-learn (liblinear)") { + test("linearSVC L-BFGS squared_hinge loss comparison with scikit-learn (liblinear)") { val linearSVC = new LinearSVC() .setLoss("squared_hinge") - .setSolver("owlqn") - .setRegParam(2.0 / 10 / 1000) // set regParam = 2.0 / datasize / c - .setMaxIter(80) + .setSolver("L-BFGS") + .setRegParam(3.0 / 10 / 1000) + .setMaxIter(100) .setTol(1e-4) val model = linearSVC.fit(binaryDataset.limit(1000)) @@ -323,24 +303,53 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val coefficientsSK = Vectors.dense(2.85136074, 6.25310456, 9.00668415, 12.17750981) val interceptSK = 2.93419973 - assert(model.intercept ~== interceptSK relTol 2E-2) - assert(model.coefficients ~== coefficientsSK relTol 2E-2) + assert(model.intercept ~== interceptSK relTol 3E-2) + assert(model.coefficients ~== coefficientsSK relTol 3E-2) } - test("linearSVC L-BFGS squared_hinge loss comparison with scikit-learn (liblinear)") { + test("linearSVC L-BFGS and OWLQN get similar model for squared_hinge loss") { + val size = nPoints val linearSVC = new LinearSVC() .setLoss("squared_hinge") .setSolver("L-BFGS") - .setRegParam(3.0 / 10 / 1000) // set regParam = 2.0 / datasize / c - .setMaxIter(30) + .setRegParam(2.0 / 10 / size) // set regParam = 2.0 / datasize / c + .setMaxIter(200) .setTol(1e-4) - val model = linearSVC.fit(binaryDataset.limit(1000)) + val model = linearSVC.fit(smallBinaryDataset) - // refer to last unit test for python code - val coefficientsSK = Vectors.dense(2.85136074, 6.25310456, 9.00668415, 12.17750981) - val interceptSK = 2.93419973 - assert(model.intercept ~== interceptSK relTol 3E-2) - assert(model.coefficients ~== coefficientsSK relTol 3E-2) + val linearSVC2 = new LinearSVC() + .setLoss("squared_hinge") + .setSolver("OWLQN") + .setRegParam(2.0 / 10 / size) // set regParam = 2.0 / datasize / c + .setMaxIter(200) + .setTol(1e-4) + val model2 = linearSVC2.fit(smallBinaryDataset) + + assert(model.coefficients ~== model2.coefficients relTol 1E-3) + assert(model.intercept ~== model2.intercept relTol 1E-3) + } + + test("linearSVC L-BFGS and OWLQN get similar model for hinge loss") { + val linearSVC = new LinearSVC() + .setLoss("hinge") + .setSolver("L-BFGS") + .setRegParam(0.01) + .setMaxIter(200) + .setTol(1e-4) + .setFitIntercept(false) + val model = linearSVC.fit(smallBinaryDataset) + + val linearSVC2 = new LinearSVC() + .setLoss("hinge") + .setSolver("OWLQN") + .setRegParam(0.01) + .setMaxIter(200) + .setTol(1e-4) + .setFitIntercept(false) + val model2 = linearSVC2.fit(smallBinaryDataset) + assert(model.coefficients ~== model2.coefficients relTol 2E-2) + assert(model.intercept === 0) + assert(model2.intercept === 0) } test("read/write: SVM") { From aaf35ec1a838eaef72d7a880ddbb55d79bfcaecf Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Fri, 30 Jun 2017 15:02:52 -0700 Subject: [PATCH 09/14] ut update --- .../ml/classification/LinearSVCSuite.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index b8c664a746e26..3293c46b5d7f5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -280,9 +280,10 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val linearSVC = new LinearSVC() .setLoss("squared_hinge") .setSolver("L-BFGS") - .setRegParam(3.0 / 10 / 1000) + .setRegParam(2.0 / 10 / 1000) .setMaxIter(100) .setTol(1e-4) + .setFitIntercept(false) val model = linearSVC.fit(binaryDataset.limit(1000)) /* @@ -293,18 +294,18 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau data = np.loadtxt(f, delimiter=",")[:1000] X = data[:, 1:] # select columns 1 through end y = data[:, 0] # select column 0 as label - clf = svm.LinearSVC(fit_intercept=True, C=10, loss='squared_hinge', tol=1e-4, random_state=42) + clf = svm.LinearSVC(fit_intercept=False, C=10, + loss='squared_hinge', tol=1e-4, random_state=42) m = clf.fit(X, y) print m.coef_ print m.intercept_ - [[ 2.85136074 6.25310456 9.00668415 12.17750981]] - [ 2.93419973] + [[ 0.62836794 1.24577698 1.70704463 2.38387201]] + 0.0 */ - val coefficientsSK = Vectors.dense(2.85136074, 6.25310456, 9.00668415, 12.17750981) - val interceptSK = 2.93419973 - assert(model.intercept ~== interceptSK relTol 3E-2) - assert(model.coefficients ~== coefficientsSK relTol 3E-2) + val coefficientsSK = Vectors.dense(0.62836794, 1.24577698, 1.70704463, 2.38387201) + assert(model.intercept === 0) + assert(model.coefficients ~== coefficientsSK relTol 1E-2) } test("linearSVC L-BFGS and OWLQN get similar model for squared_hinge loss") { From 93f7b6813f587b927e11ab7d520e67f2b61c308c Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Sun, 3 Sep 2017 16:11:05 -0700 Subject: [PATCH 10/14] merge conflict and add unit tests --- .../spark/ml/classification/LinearSVC.scala | 33 +--- .../ml/optim/aggregator/HingeAggregator.scala | 19 -- .../aggregator/SquaredHingeAggregator.scala | 20 ++- .../apache/spark/ml/r/LinearSVCWrapper.scala | 5 +- .../ml/classification/LinearSVCSuite.scala | 18 +- .../aggregator/HingeAggregatorSuite.scala | 2 +- .../SquaredHingeAggregatorSuite.scala | 165 ++++++++++++++++++ 7 files changed, 185 insertions(+), 77 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 78a2adcafb664..a40e316542c91 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -30,11 +30,7 @@ import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ -<<<<<<< HEAD import org.apache.spark.ml.optim.aggregator.{HingeAggregator, SquaredHingeAggregator} -======= -import org.apache.spark.ml.optim.aggregator.HingeAggregator ->>>>>>> upstream/master import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ @@ -48,8 +44,7 @@ import org.apache.spark.sql.functions.{col, lit} /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol -<<<<<<< HEAD - with HasAggregationDepth with HasSolver { + with HasAggregationDepth with HasThreshold with HasSolver { /** * Specifies the loss function. Currently "hinge" and "squared_hinge" are supported. @@ -65,14 +60,10 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR "function. hinge is the standard SVM loss while squared_hinge is the square of the hinge loss.", (s: String) => LinearSVC.supportedLoss.contains(s.toLowerCase(Locale.ROOT))) - setDefault(loss -> "squared_hinge") - /** @group getParam */ @Since("2.3.0") def getLoss: String = $(loss) -======= - with HasAggregationDepth with HasThreshold { ->>>>>>> upstream/master + /** * Param for threshold in binary classification prediction. @@ -201,6 +192,7 @@ class LinearSVC @Since("2.2.0") ( */ @Since("2.3.0") def setLoss(value: String): this.type = set(loss, value) + setDefault(loss -> "squared_hinge") /** * Set solver for LinearSVC. Supported options: "l-bfgs" and "owlqn" (case insensitve). @@ -277,8 +269,6 @@ class LinearSVC @Since("2.2.0") ( val getFeaturesStd = (j: Int) => featuresStd(j) val regParamL2 = $(regParam) val bcFeaturesStd = instances.context.broadcast(featuresStd) -<<<<<<< HEAD -======= val regularization = if (regParamL2 != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures Some(new L2Regularization(regParamL2, shouldApply, @@ -287,19 +277,6 @@ class LinearSVC @Since("2.2.0") ( None } - val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) - val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, - $(aggregationDepth)) ->>>>>>> upstream/master - - val regularization = if (regParamL2 != 0.0) { - val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures - Some(new L2Regularization(regParamL2, shouldApply, - if ($(standardization)) None else Some(featuresStd))) - } else { - None - } - val costFun = $(loss) match { case "hinge" => val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) @@ -483,7 +460,3 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] { } } } -<<<<<<< HEAD - -======= ->>>>>>> upstream/master diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index ae5480a800a35..0300500a34ec0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -22,18 +22,10 @@ import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ /** -<<<<<<< HEAD - * LinearSVCAggregator computes the gradient and loss for loss function ("hinge" or - * "squared_hinge", as used in binary classification for instances in sparse or dense - * vector in an online fashion. - * - * Two LinearSVCAggregator can be merged together to have a summary of loss and gradient of -======= * HingeAggregator computes the gradient and loss for Hinge loss function as used in * binary classification for instances in sparse or dense vector in an online fashion. * * Two HingeAggregators can be merged together to have a summary of loss and gradient of ->>>>>>> upstream/master * the corresponding joint dataset. * * This class standardizes feature values during computation using bcFeaturesStd. @@ -57,16 +49,6 @@ private[ml] class HingeAggregator( protected override val dim: Int = numFeaturesPlusIntercept /** -<<<<<<< HEAD - * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient - * of the objective function. - * - * @param instance The instance of data point to be added. - * @return This LinearSVCAggregator object. - */ - def add(instance: Instance): this.type = { - instance match { case Instance(label, weight, features) => -======= * Add a new training instance to this HingeAggregator, and update the loss and gradient * of the objective function. * @@ -78,7 +60,6 @@ private[ml] class HingeAggregator( require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + s" Expecting $numFeatures but got ${features.size}.") require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") ->>>>>>> upstream/master if (weight == 0.0) return this val localFeaturesStd = bcFeaturesStd.value diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala index 5063a9d59c5ce..9638ace148904 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala @@ -22,11 +22,10 @@ import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ /** - * LinearSVCAggregator computes the gradient and loss for loss function ("hinge" or - * "squared_hinge", as used in binary classification for instances in sparse or dense - * vector in an online fashion. + * SquaredHingeAggregator computes the gradient and loss for squared Hinge loss function, as used in + * binary classification for instances in sparse or dense vector in an online fashion. * - * Two LinearSVCAggregator can be merged together to have a summary of loss and gradient of + * Two SquaredHingeAggregator can be merged together to have a summary of loss and gradient of * the corresponding joint dataset. * * This class standardizes feature values during computation using bcFeaturesStd. @@ -50,14 +49,17 @@ private[ml] class SquaredHingeAggregator( protected override val dim: Int = numFeaturesPlusIntercept /** - * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient + * Add a new training instance to this SquaredHingeAggregator, and update the loss and gradient * of the objective function. * * @param instance The instance of data point to be added. - * @return This LinearSVCAggregator object. + * @return This SquaredHingeAggregator object. */ def add(instance: Instance): this.type = { instance match { case Instance(label, weight, features) => + require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + + s" Expecting $numFeatures but got ${features.size}.") + require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this val localFeaturesStd = bcFeaturesStd.value @@ -74,8 +76,8 @@ private[ml] class SquaredHingeAggregator( if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) sum } - // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) - // Therefore the gradient is -(2y - 1)*x + // Our loss function with {0, 1} labels is (max(0, 1 - (2y - 1) (f_w(x))))^2 + // Therefore the gradient is 2 * ((2y - 1) f_w(x) - 1) * (2y - 1) * x val labelScaled = 2 * label - 1.0 val loss = if (1.0 > labelScaled * dotProduct) { val hingeLoss = 1.0 - labelScaled * dotProduct @@ -85,7 +87,7 @@ private[ml] class SquaredHingeAggregator( } if (1.0 > labelScaled * dotProduct) { - val gradientScale = (labelScaled * dotProduct - 1) * labelScaled * 2 + val gradientScale = (labelScaled * dotProduct - 1) * labelScaled * 2 * weight features.foreachActive { (index, value) => if (localFeaturesStd(index) != 0.0 && value != 0.0) { localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala index 3a6bfcaf5238d..ee919421f4217 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala @@ -80,12 +80,9 @@ private[r] object LinearSVCWrapper threshold: Double, weightCol: String, aggregationDepth: Int, -<<<<<<< HEAD + handleInvalid: String, solver: String, loss: String -======= - handleInvalid: String ->>>>>>> upstream/master ): LinearSVCWrapper = { val rFormula = new RFormula() diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index 5904ad3a87824..c3250051fec86 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -25,13 +25,8 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LinearSVCSuite._ import org.apache.spark.ml.feature.{Instance, LabeledPoint} import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} -<<<<<<< HEAD -import org.apache.spark.ml.optim.aggregator.SquaredHingeAggregator -import org.apache.spark.ml.param.{ParamMap, ParamsSuite} -======= -import org.apache.spark.ml.optim.aggregator.HingeAggregator +import org.apache.spark.ml.optim.aggregator.{HingeAggregator, SquaredHingeAggregator} import org.apache.spark.ml.param.ParamsSuite ->>>>>>> upstream/master import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -114,7 +109,6 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val lsvc = new LinearSVC() assert(lsvc.getRegParam === 0.0) assert(lsvc.getMaxIter === 100) - assert(lsvc.getLoss === "squared_hinge") assert(lsvc.getFitIntercept) assert(lsvc.getTol === 1E-6) assert(lsvc.getStandardization) @@ -126,11 +120,11 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(lsvc.getPredictionCol === "prediction") assert(lsvc.getRawPredictionCol === "rawPrediction") assert(lsvc.getSolver === "l-bfgs") + assert(lsvc.getLoss === "squared_hinge") val model = lsvc.setMaxIter(5).fit(smallBinaryDataset) model.transform(smallBinaryDataset) .select("label", "prediction", "rawPrediction") .collect() - assert(model.getLoss === "squared_hinge") assert(model.getThreshold === 0.0) assert(model.getFeaturesCol === "features") assert(model.getPredictionCol === "prediction") @@ -195,11 +189,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau test("sparse coefficients in HingeAggregator") { val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) -<<<<<<< HEAD - val agg = new SquaredHingeAggregator(bcFeaturesStd, true)(bcCoefficients) -======= val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients) ->>>>>>> upstream/master val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) @@ -377,7 +367,6 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau object LinearSVCSuite { val allParamSettings: Map[String, Any] = Map( - "loss" -> "squared_hinge", "regParam" -> 0.01, "maxIter" -> 2, // intentionally small "fitIntercept" -> true, @@ -387,7 +376,8 @@ object LinearSVCSuite { "predictionCol" -> "myPredict", "rawPredictionCol" -> "myRawPredict", "aggregationDepth" -> 3, - "solver" -> "owlqn" + "solver" -> "owlqn", + "loss" -> "squared_hinge" ) // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index 61b48ffa10944..b5aa7023d4221 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -130,7 +130,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val gradientCoef = new Array[Double](numFeatures) var gradientIntercept = 0.0 instances.foreach { case Instance(l, w, f) => - val margin = BLAS.dot(f, Vectors.dense(coefArray)) + intercept + val margin = BLAS.dot(f, Vectors.dense(stdCoef)) + intercept if (1.0 > (2 * l - 1.0) * margin) { gradientCoef.indices.foreach { i => gradientCoef(i) += f(i) * -(2 * l - 1.0) * w / featuresStd(i) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala new file mode 100644 index 0000000000000..3e7d362052bff --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class SquaredHingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { + + import DifferentiableLossAggregatorSuite.getClassificationSummarizers + + @transient var instances: Array[Instance] = _ + @transient var instancesConstantFeature: Array[Instance] = _ + @transient var instancesConstantFeatureFiltered: Array[Instance] = _ + + override def beforeAll(): Unit = { + super.beforeAll() + instances = Array( + Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), + Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), + Instance(0.0, 0.3, Vectors.dense(4.0, 0.5)) + ) + instancesConstantFeature = Array( + Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), + Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), + Instance(1.0, 0.3, Vectors.dense(1.0, 0.5)) + ) + instancesConstantFeatureFiltered = Array( + Instance(0.0, 0.1, Vectors.dense(2.0)), + Instance(1.0, 0.5, Vectors.dense(1.0)), + Instance(2.0, 0.3, Vectors.dense(0.5)) + ) + } + + /** Get summary statistics for some data and create a new SquaredHingeAggregator. */ + private def getNewAggregator( + instances: Array[Instance], + coefficients: Vector, + fitIntercept: Boolean): SquaredHingeAggregator = { + val (featuresSummarizer, ySummarizer) = + DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) + val bcCoefficients = spark.sparkContext.broadcast(coefficients) + new SquaredHingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients) + } + + test("SquaredHingeAggregator check add method input size") { + val coefArray = Array(1.0, 2.0) + val interceptArray = Array(2.0) + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + fitIntercept = true) + withClue("SquaredHingeAggregator features dimension must match coefficients dimension") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(2.0))) + } + } + } + + test("SquaredHingeAggregator negative weight") { + val coefArray = Array(1.0, 2.0) + val interceptArray = Array(2.0) + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + fitIntercept = true) + withClue("SquaredHingeAggregator does not support negative instance weights") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))) + } + } + } + + test("SquaredHingeAggregator check sizes") { + val rng = new scala.util.Random + val numFeatures = instances.head.features.size + val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble)) + val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble)) + val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true) + val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept, + fitIntercept = false) + instances.foreach(aggIntercept.add) + instances.foreach(aggNoIntercept.add) + + assert(aggIntercept.gradient.size === numFeatures + 1) + assert(aggNoIntercept.gradient.size === numFeatures) + } + + test("SquaredHingeAggregator check correctness") { + val coefArray = Array(1.0, 2.0) + val intercept = 1.0 + val numFeatures = instances.head.features.size + val (featuresSummarizer, _) = getClassificationSummarizers(instances) + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val weightSum = instances.map(_.weight).sum + + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true) + instances.foreach(agg.add) + + // compute the loss + val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i)).toArray + val lossSum = instances.map { case Instance(l, w, f) => + val margin = BLAS.dot(Vectors.dense(stdCoef), f) + intercept + val labelScaled = 2 * l - 1.0 + if (1.0 > labelScaled * margin) { + val hingeLoss = 1.0 - labelScaled * margin + hingeLoss * hingeLoss * w + } else { + 0.0 + } + }.sum + val loss = lossSum / weightSum + + // compute the gradients + val gradientCoef = new Array[Double](numFeatures) + var gradientIntercept = 0.0 + instances.foreach { case Instance(l, w, f) => + val margin = BLAS.dot(f, Vectors.dense(stdCoef)) + intercept + if (1.0 > (2 * l - 1.0) * margin) { + val gradientScale = ((2 * l - 1) * margin - 1) * (2 * l - 1) * 2 + gradientCoef.indices.foreach { i => + gradientCoef(i) += f(i) * gradientScale * w / featuresStd(i) + } + gradientIntercept += gradientScale * w + } + } + val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum)) + + assert(loss ~== agg.loss relTol 0.01) + assert(gradient ~== agg.gradient relTol 0.01) + } + + test("check with zero standard deviation") { + val binaryCoefArray = Array(1.0, 2.0) + val intercept = 1.0 + val aggConstantFeatureBinary = getNewAggregator(instancesConstantFeature, + Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true) + instancesConstantFeature.foreach(aggConstantFeatureBinary.add) + + val aggConstantFeatureBinaryFiltered = getNewAggregator(instancesConstantFeatureFiltered, + Vectors.dense(binaryCoefArray.tail ++ Array(intercept)), fitIntercept = true) + instancesConstantFeatureFiltered.foreach(aggConstantFeatureBinaryFiltered.add) + + // constant features should not affect gradient + assert(aggConstantFeatureBinary.gradient(0) === 0.0) + assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) + } + +} From cec628ba094fcbaad2fad4a7c5957aa9f5d3d698 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Sun, 3 Sep 2017 16:26:33 -0700 Subject: [PATCH 11/14] style --- R/pkg/R/mllib_classification.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/R/pkg/R/mllib_classification.R b/R/pkg/R/mllib_classification.R index d052b4e2fcc2f..e1d38741e003e 100644 --- a/R/pkg/R/mllib_classification.R +++ b/R/pkg/R/mllib_classification.R @@ -123,7 +123,8 @@ setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formu jobj <- callJStatic("org.apache.spark.ml.r.LinearSVCWrapper", "fit", data@sdf, formula, as.numeric(regParam), as.integer(maxIter), as.numeric(tol), as.logical(standardization), as.numeric(threshold), - weightCol, as.integer(aggregationDepth), handleInvalid, solver, loss) + weightCol, as.integer(aggregationDepth), handleInvalid, solver, + loss) new("LinearSVCModel", jobj = jobj) }) From 0f5cad5ca9770871fb2a07968f53332f03e74903 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Fri, 15 Sep 2017 17:39:43 -0700 Subject: [PATCH 12/14] fix python ut --- python/pyspark/ml/classification.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 3795beb7fb854..38762a3671c01 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -81,9 +81,9 @@ class LinearSVC(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, Ha >>> svm = LinearSVC(maxIter=5, regParam=0.01) >>> model = svm.fit(df) >>> model.coefficients - DenseVector([0.0, 0.0759, -0.6167]) + DenseVector([0.0, 0.0844, -0.7532]) >>> model.intercept - 1.3113904822325306 + 1.5941035229606713 >>> model.numClasses 2 >>> model.numFeatures @@ -93,7 +93,7 @@ class LinearSVC(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, Ha >>> result.prediction 1.0 >>> result.rawPrediction - DenseVector([-1.8521, 1.8521]) + DenseVector([-2.2629, 2.2629]) >>> svm_path = temp_path + "/svm" >>> svm.save(svm_path) >>> svm2 = LinearSVC.load(svm_path) From 1f8e98411401add53dddbb13e94f450c27b7c0fd Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Sun, 1 Oct 2017 18:06:14 -0700 Subject: [PATCH 13/14] style --- .../scala/org/apache/spark/ml/classification/LinearSVC.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index f17a2f094f383..7530223aad743 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -340,10 +340,6 @@ class LinearSVC @Since("2.2.0") ( (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result()) } - println("total iterations: " + objectiveHistory.length) - println("loss: " + objectiveHistory.takeRight(5).mkString("\n")) - - val model = copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector)) instr.logSuccess(model) model From 0bb5afe54a9a53054d2076ac28b09234a7380bbf Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Sat, 14 Oct 2017 20:13:18 -0700 Subject: [PATCH 14/14] minor updates --- R/pkg/R/mllib_classification.R | 5 +- .../spark/ml/classification/LinearSVC.scala | 33 +++++--- .../aggregator/SquaredHingeAggregator.scala | 11 +-- .../ml/classification/LinearSVCSuite.scala | 84 ++++++++----------- .../aggregator/HingeAggregatorSuite.scala | 15 ++++ .../SquaredHingeAggregatorSuite.scala | 15 ++++ 6 files changed, 94 insertions(+), 69 deletions(-) diff --git a/R/pkg/R/mllib_classification.R b/R/pkg/R/mllib_classification.R index 2372e17dd12d7..68cf74be98186 100644 --- a/R/pkg/R/mllib_classification.R +++ b/R/pkg/R/mllib_classification.R @@ -58,8 +58,9 @@ setClass("NaiveBayesModel", representation(jobj = "jobj")) #' @param regParam The regularization parameter. Only supports L2 regularization currently. #' @param maxIter Maximum iteration number. #' @param tol Convergence tolerance of iterations. -#' @param solver solver parameter, supported options: "owlqn" or "l-bfgs". Default is "l-bfgs" -#' @param loss loss function, supported options: "hinge" and "squared_hinge". Default is "squared_hinge" +#' @param solver Optimization solver, supported options: "owlqn" or "l-bfgs". Default is "l-bfgs" +#' @param loss Loss function, supported options: "hinge" and "squared_hinge". Default is +# "squared_hinge" #' @param standardization Whether to standardize the training features before fitting the model. #' The coefficients of models will be always returned on the original scale, #' so it will be transparent for users. Note that with/without diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 7530223aad743..bc0757f836e00 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -192,7 +192,7 @@ class LinearSVC @Since("2.2.0") ( */ @Since("2.3.0") def setLoss(value: String): this.type = set(loss, value) - setDefault(loss -> "squared_hinge") + setDefault(loss -> SQUARED_HINGE) /** * Set solver for LinearSVC. Supported options: "l-bfgs" and "owlqn" (case insensitve). @@ -202,14 +202,13 @@ class LinearSVC @Since("2.2.0") ( * (default: "owlqn") * @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setSolver(value: String): this.type = { - val lowercaseValue = value.toLowerCase(Locale.ROOT) - require(supportedOptimizers.contains(lowercaseValue), - s"Solver $value was not supported. Supported options: l-bfgs, owlqn") - set(solver, lowercaseValue) + require(supportedSolvers.contains(value.toLowerCase(Locale.ROOT)), s"Solver $value was" + + s" not supported. Supported options: ${supportedSolvers.mkString(", ")}") + set(solver, value) } - setDefault(solver -> "l-bfgs") + setDefault(solver -> LBFGS) @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) @@ -278,16 +277,16 @@ class LinearSVC @Since("2.2.0") ( } val costFun = $(loss) match { - case "hinge" => + case HINGE => val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) - case "squared_hinge" => + case SQUARED_HINGE => val getAggregatorFunc = new SquaredHingeAggregator(bcFeaturesStd, $(fitIntercept))(_) new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) case unexpected => throw new SparkException( - s"unexpected lossFunction in LinearSVCAggregator: $unexpected") + s"unexpected loss Function in LinearSVC: $unexpected") } val optimizer = $(solver).toLowerCase(Locale.ROOT) match { @@ -295,7 +294,7 @@ class LinearSVC @Since("2.2.0") ( case OWLQN => def regParamL1Fun = (index: Int) => 0D new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) - case _ => throw new SparkException ("unexpected optimizer: " + $(solver)) + case _ => throw new SparkException ("unexpected solver: " + $(solver)) } val initialCoefWithIntercept = Vectors.zeros(numFeaturesPlusIntercept) @@ -356,12 +355,20 @@ object LinearSVC extends DefaultParamsReadable[LinearSVC] { private[classification] val OWLQN: String = "owlqn".toLowerCase(Locale.ROOT) /* Set of optimizers that LinearSVC supports */ - private[classification] val supportedOptimizers = Array(LBFGS, OWLQN) + private[classification] val supportedSolvers = Array(LBFGS, OWLQN) + + /** String name for Hinge Loss. */ + private[classification] val HINGE: String = "hinge".toLowerCase(Locale.ROOT) + + /** String name for Squared Hinge Loss. */ + private[classification] val SQUARED_HINGE: String = "squared_hinge".toLowerCase(Locale.ROOT) + + /* Set of loss function that LinearSVC supports */ + private[classification] val supportedLoss = Array(HINGE, SQUARED_HINGE) @Since("2.2.0") override def load(path: String): LinearSVC = super.load(path) - private[classification] val supportedLoss = Array("hinge", "squared_hinge") } /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala index 9638ace148904..d9c245ba883e6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala @@ -79,15 +79,16 @@ private[ml] class SquaredHingeAggregator( // Our loss function with {0, 1} labels is (max(0, 1 - (2y - 1) (f_w(x))))^2 // Therefore the gradient is 2 * ((2y - 1) f_w(x) - 1) * (2y - 1) * x val labelScaled = 2 * label - 1.0 - val loss = if (1.0 > labelScaled * dotProduct) { - val hingeLoss = 1.0 - labelScaled * dotProduct + val scaledDoctProduct = labelScaled * dotProduct + val loss = if (1.0 > scaledDoctProduct) { + val hingeLoss = 1.0 - scaledDoctProduct hingeLoss * hingeLoss * weight } else { 0.0 } - if (1.0 > labelScaled * dotProduct) { - val gradientScale = (labelScaled * dotProduct - 1) * labelScaled * 2 * weight + if (1.0 > scaledDoctProduct) { + val gradientScale = (scaledDoctProduct - 1) * labelScaled * 2 * weight features.foreachActive { (index, value) => if (localFeaturesStd(index) != 0.0 && value != 0.0) { localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) @@ -96,7 +97,7 @@ private[ml] class SquaredHingeAggregator( if (fitIntercept) { localGradientSumArray(localGradientSumArray.length - 1) += gradientScale } - } + } // else gradient will not be updated. lossSum += loss weightSum += weight diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index c3250051fec86..2baf1604620a0 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -23,9 +23,8 @@ import breeze.linalg.{DenseVector => BDV} import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LinearSVCSuite._ -import org.apache.spark.ml.feature.{Instance, LabeledPoint} +import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.ml.optim.aggregator.{HingeAggregator, SquaredHingeAggregator} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ @@ -39,11 +38,12 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau import testImplicits._ private val nPoints = 50 - @transient var smallBinaryDataset: Dataset[_] = _ + @transient var smallTrainingDataset: Dataset[_] = _ @transient var smallValidationDataset: Dataset[_] = _ @transient var binaryDataset: Dataset[_] = _ - @transient var smallSparseBinaryDataset: Dataset[_] = _ + // equivalent with smallTrainingDataset but in sparse Vectors. + @transient var smallSparseTrainingDataset: Dataset[_] = _ @transient var smallSparseValidationDataset: Dataset[_] = _ override def beforeAll(): Unit = { @@ -53,16 +53,15 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val A = 0.01 val B = -1.5 val C = 1.0 - smallBinaryDataset = generateSVMInput(A, Array[Double](B, C), nPoints, 42).toDF() + smallTrainingDataset = generateSVMInput(A, Array[Double](B, C), nPoints, 42).toDF() smallValidationDataset = generateSVMInput(A, Array[Double](B, C), nPoints, 17).toDF() binaryDataset = generateSVMInput(1.0, Array[Double](1.0, 2.0, 3.0, 4.0), 10000, 42).toDF() // Dataset for testing SparseVector val toSparse: Vector => SparseVector = _.asInstanceOf[DenseVector].toSparse val sparse = udf(toSparse) - smallSparseBinaryDataset = smallBinaryDataset.withColumn("features", sparse('features)) + smallSparseTrainingDataset = smallTrainingDataset.withColumn("features", sparse('features)) smallSparseValidationDataset = smallValidationDataset.withColumn("features", sparse('features)) - } /** @@ -76,26 +75,28 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau } test("Linear SVC binary classification") { - LinearSVC.supportedOptimizers.foreach { opt => - Array("hinge", "squared_hinge").foreach { loss => + LinearSVC.supportedSolvers.foreach { opt => + LinearSVC.supportedLoss.foreach { loss => val svm = new LinearSVC().setLoss(loss).setSolver(opt) - val model = svm.fit(smallBinaryDataset) + val model = svm.fit(smallTrainingDataset) assert(model.transform(smallValidationDataset) .where("prediction=label").count() > nPoints * 0.8) - val sparseModel = svm.fit(smallSparseBinaryDataset) - checkModels(model, sparseModel) + val sparseModel = svm.fit(smallSparseTrainingDataset) + checkModelsEqual(model, sparseModel) } } } test("Linear SVC binary classification with regularization") { - LinearSVC.supportedOptimizers.foreach { opt => - val svm = new LinearSVC().setSolver(opt).setMaxIter(10) - val model = svm.setRegParam(0.1).fit(smallBinaryDataset) - assert(model.transform(smallValidationDataset) - .where("prediction=label").count() > nPoints * 0.8) - val sparseModel = svm.fit(smallSparseBinaryDataset) - checkModels(model, sparseModel) + LinearSVC.supportedSolvers.foreach { opt => + LinearSVC.supportedLoss.foreach { loss => + val svm = new LinearSVC().setSolver(opt).setLoss(loss).setMaxIter(10) + val model = svm.setRegParam(0.1).fit(smallTrainingDataset) + assert(model.transform(smallValidationDataset) + .where("prediction=label").count() > nPoints * 0.8) + val sparseModel = svm.fit(smallSparseTrainingDataset) + checkModelsEqual(model, sparseModel) + } } } @@ -119,10 +120,10 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(lsvc.getFeaturesCol === "features") assert(lsvc.getPredictionCol === "prediction") assert(lsvc.getRawPredictionCol === "rawPrediction") - assert(lsvc.getSolver === "l-bfgs") - assert(lsvc.getLoss === "squared_hinge") - val model = lsvc.setMaxIter(5).fit(smallBinaryDataset) - model.transform(smallBinaryDataset) + assert(lsvc.getSolver === LinearSVC.LBFGS) + assert(lsvc.getLoss === LinearSVC.SQUARED_HINGE) + val model = lsvc.setMaxIter(5).fit(smallTrainingDataset) + model.transform(smallTrainingDataset) .select("label", "prediction", "rawPrediction") .collect() assert(model.getThreshold === 0.0) @@ -178,38 +179,23 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau test("linear svc doesn't fit intercept when fitIntercept is off") { val lsvc = new LinearSVC().setFitIntercept(false).setMaxIter(5) - val model = lsvc.fit(smallBinaryDataset) + val model = lsvc.fit(smallTrainingDataset) assert(model.intercept === 0.0) val lsvc2 = new LinearSVC().setFitIntercept(true).setMaxIter(5) - val model2 = lsvc2.fit(smallBinaryDataset) + val model2 = lsvc2.fit(smallTrainingDataset) assert(model2.intercept !== 0.0) } - test("sparse coefficients in HingeAggregator") { - val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) - val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients) - val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { - intercept[IllegalArgumentException] { - agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) - } - } - assert(thrown.getMessage.contains("coefficients only supports dense")) - - bcCoefficients.destroy(blocking = false) - bcFeaturesStd.destroy(blocking = false) - } - test("linearSVC with sample weights") { def modelEquals(m1: LinearSVCModel, m2: LinearSVCModel): Unit = { assert(m1.coefficients ~== m2.coefficients absTol 0.07) assert(m1.intercept ~== m2.intercept absTol 0.05) } - LinearSVC.supportedOptimizers.foreach { opt => + LinearSVC.supportedSolvers.foreach { opt => val estimator = new LinearSVC().setRegParam(0.02).setTol(0.01).setSolver(opt) .setLoss("hinge") - val dataset = smallBinaryDataset + val dataset = smallTrainingDataset MLTestingUtils.testArbitrarilyScaledWeights[LinearSVCModel, LinearSVC]( dataset.as[LabeledPoint], estimator, modelEquals) MLTestingUtils.testOutliersWithSmallWeights[LinearSVCModel, LinearSVC]( @@ -315,7 +301,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau .setRegParam(2.0 / 10 / size) // set regParam = 2.0 / datasize / c .setMaxIter(200) .setTol(1e-4) - val model = linearSVC.fit(smallBinaryDataset) + val model = linearSVC.fit(smallTrainingDataset) val linearSVC2 = new LinearSVC() .setLoss("squared_hinge") @@ -323,7 +309,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau .setRegParam(2.0 / 10 / size) // set regParam = 2.0 / datasize / c .setMaxIter(200) .setTol(1e-4) - val model2 = linearSVC2.fit(smallBinaryDataset) + val model2 = linearSVC2.fit(smallTrainingDataset) assert(model.coefficients ~== model2.coefficients relTol 1E-3) assert(model.intercept ~== model2.intercept relTol 1E-3) @@ -337,7 +323,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau .setMaxIter(200) .setTol(1e-4) .setFitIntercept(false) - val model = linearSVC.fit(smallBinaryDataset) + val model = linearSVC.fit(smallTrainingDataset) val linearSVC2 = new LinearSVC() .setLoss("hinge") @@ -346,7 +332,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau .setMaxIter(200) .setTol(1e-4) .setFitIntercept(false) - val model2 = linearSVC2.fit(smallBinaryDataset) + val model2 = linearSVC2.fit(smallTrainingDataset) assert(model.coefficients ~== model2.coefficients relTol 2E-2) assert(model.intercept === 0) assert(model2.intercept === 0) @@ -359,7 +345,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model.numFeatures === model2.numFeatures) } val svm = new LinearSVC() - testEstimatorAndModelReadWrite(svm, smallBinaryDataset, LinearSVCSuite.allParamSettings, + testEstimatorAndModelReadWrite(svm, smallTrainingDataset, LinearSVCSuite.allParamSettings, LinearSVCSuite.allParamSettings, checkModelData) } } @@ -377,7 +363,7 @@ object LinearSVCSuite { "rawPredictionCol" -> "myRawPredict", "aggregationDepth" -> 3, "solver" -> "owlqn", - "loss" -> "squared_hinge" + "loss" -> "hinge" ) // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) @@ -397,7 +383,7 @@ object LinearSVCSuite { y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2))) } - def checkModels(model1: LinearSVCModel, model2: LinearSVCModel): Unit = { + def checkModelsEqual(model1: LinearSVCModel, model2: LinearSVCModel): Unit = { assert(model1.intercept == model2.intercept) assert(model1.coefficients.equals(model2.coefficients)) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index b5aa7023d4221..e7bcfe33adf93 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -160,4 +160,19 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) } + test("sparse coefficients in HingeAggregator") { + val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) + val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) + val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients) + val thrown = withClue("HingeAggregator cannot handle sparse coefficients") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) + } + } + assert(thrown.getMessage.contains("coefficients only supports dense")) + + bcCoefficients.destroy(blocking = false) + bcFeaturesStd.destroy(blocking = false) + } + } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala index 3e7d362052bff..f0c9d9affe54b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala @@ -162,4 +162,19 @@ class SquaredHingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) } + test("sparse coefficients in SquaredHingeAggregator") { + val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) + val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) + val agg = new SquaredHingeAggregator(bcFeaturesStd, true)(bcCoefficients) + val thrown = withClue("SquaredHingeAggregator cannot handle sparse coefficients") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) + } + } + assert(thrown.getMessage.contains("coefficients only supports dense")) + + bcCoefficients.destroy(blocking = false) + bcFeaturesStd.destroy(blocking = false) + } + }