diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 89718e0f3e15a..2be1705353353 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -231,6 +231,184 @@ class LinearRegression(override val uid: String) override def copy(extra: ParamMap): LinearRegression = defaultCopy(extra) } +/** + * :: Experimental :: + * Robust regression. + * + * The learning objective is to minimize the HuberCostFun, with regularization. + * The specific squared error loss function used is: + */ +@Experimental +class RobustRegression(override val uid: String) + extends Regressor[Vector, RobustRegression, LinearRegressionModel] + with LinearRegressionParams with Logging { + + def this() = this(Identifiable.randomUID("linReg")) + + /** + * Set the regularization parameter. + * Default is 0.0. + * @group setParam + */ + def setRegParam(value: Double): this.type = set(regParam, value) + setDefault(regParam -> 0.0) + + /** + * Set if we should fit the intercept + * Default is true. + * @group setParam + */ + def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value) + setDefault(fitIntercept -> true) + + /** + * Set the ElasticNet mixing parameter. + * For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. + * For 0 < alpha < 1, the penalty is a combination of L1 and L2. + * Default is 0.0 which is an L2 penalty. + * @group setParam + */ + def setElasticNetParam(value: Double): this.type = set(elasticNetParam, value) + setDefault(elasticNetParam -> 0.0) + + /** + * Set the maximum number of iterations. + * Default is 100. + * @group setParam + */ + def setMaxIter(value: Int): this.type = set(maxIter, value) + setDefault(maxIter -> 100) + + /** + * Set the convergence tolerance of iterations. + * Smaller value will lead to higher accuracy with the cost of more iterations. + * Default is 1E-6. + * @group setParam + */ + def setTol(value: Double): this.type = set(tol, value) + setDefault(tol -> 1E-6) + + override protected def train(dataset: DataFrame): LinearRegressionModel = { + // Extract columns from data. If dataset is persisted, do not persist instances. + val instances = extractLabeledPoints(dataset).map { + case LabeledPoint(label: Double, features: Vector) => (label, features) + } + val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + + val (summarizer, statCounter) = instances.treeAggregate( + (new MultivariateOnlineSummarizer, new StatCounter))( + seqOp = (c, v) => (c, v) match { + case ((summarizer: MultivariateOnlineSummarizer, statCounter: StatCounter), + (label: Double, features: Vector)) => + (summarizer.add(features), statCounter.merge(label)) + }, + combOp = (c1, c2) => (c1, c2) match { + case ((summarizer1: MultivariateOnlineSummarizer, statCounter1: StatCounter), + (summarizer2: MultivariateOnlineSummarizer, statCounter2: StatCounter)) => + (summarizer1.merge(summarizer2), statCounter1.merge(statCounter2)) + }) + + val numFeatures = summarizer.mean.size + val yMean = statCounter.mean + val yStd = math.sqrt(statCounter.variance) + + // If the yStd is zero, then the intercept is yMean with zero weights; + // as a result, training is not needed. + if (yStd == 0.0) { + logWarning(s"The standard deviation of the label is zero, so the weights will be zeros " + + s"and the intercept will be the mean of the label; as a result, training is not needed.") + if (handlePersistence) instances.unpersist() + val weights = Vectors.sparse(numFeatures, Seq()) + val intercept = yMean + + val model = new LinearRegressionModel(uid, weights, intercept) + val trainingSummary = new LinearRegressionTrainingSummary( + model.transform(dataset).select($(predictionCol), $(labelCol)), + $(predictionCol), + $(labelCol), + Array(0D)) + return copyValues(model.setSummary(trainingSummary)) + } + + val featuresMean = summarizer.mean.toArray + val featuresStd = summarizer.variance.toArray.map(math.sqrt) + + // Since we implicitly do the feature scaling when we compute the cost function + // to improve the convergence, the effective regParam will be changed. + val effectiveRegParam = $(regParam) / yStd + val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam + val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam + + val costFun = new HuberCostFun(instances, yStd, yMean, $(fitIntercept), + featuresStd, featuresMean, effectiveL2RegParam) + + val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) { + new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) + } else { + new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, effectiveL1RegParam, $(tol)) + } + + val initialWeights = Vectors.zeros(numFeatures) + val states = optimizer.iterations(new CachedDiffFunction(costFun), + initialWeights.toBreeze.toDenseVector) + + val (weights, objectiveHistory) = { + /* + Note that in Linear Regression, the objective history (loss + regularization) returned + from optimizer is computed in the scaled space given by the following formula. + {{{ + L = 1/2n||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2 + regTerms + }}} + */ + val arrayBuilder = mutable.ArrayBuilder.make[Double] + var state: optimizer.State = null + while (states.hasNext) { + state = states.next() + arrayBuilder += state.adjustedValue + } + if (state == null) { + val msg = s"${optimizer.getClass.getName} failed." + logError(msg) + throw new SparkException(msg) + } + + /* + The weights are trained in the scaled space; we're converting them back to + the original space. + */ + val rawWeights = state.x.toArray.clone() + var i = 0 + val len = rawWeights.length + while (i < len) { + rawWeights(i) *= { if (featuresStd(i) != 0.0) yStd / featuresStd(i) else 0.0 } + i += 1 + } + + (Vectors.dense(rawWeights).compressed, arrayBuilder.result()) + } + + /* + The intercept in R's GLMNET is computed using closed form after the coefficients are + converged. See the following discussion for detail. + http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet + */ + val intercept = if ($(fitIntercept)) yMean - dot(weights, Vectors.dense(featuresMean)) else 0.0 + + if (handlePersistence) instances.unpersist() + + val model = copyValues(new LinearRegressionModel(uid, weights, intercept)) + val trainingSummary = new LinearRegressionTrainingSummary( + model.transform(dataset).select($(predictionCol), $(labelCol)), + $(predictionCol), + $(labelCol), + objectiveHistory) + model.setSummary(trainingSummary) + } + + override def copy(extra: ParamMap): RobustRegression = defaultCopy(extra) +} + /** * :: Experimental :: * Model produced by [[LinearRegression]]. @@ -591,3 +769,56 @@ private class LeastSquaresCostFun( (loss, gradient.toBreeze.asInstanceOf[BDV[Double]]) } } + +/** + * HuberCostFun implements Breeze's DiffFunction[T] for Huber cost as used in Robust regression. + * The Huber M-estimator corresponds to a probability distribution for the errors which is normal + * in the centre but like a double exponential distribution in the tails (Hogg 1979: 109). + * L = 1/2 ||A weights-y||^2 if |A weights-y| <= k + * L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k + * where k = 1.345 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. + * See also the documentation for the precise formulation. + * It's used in Breeze's convex optimization routines. + */ +private class HuberCostFun( + data: RDD[(Double, Vector)], + labelStd: Double, + labelMean: Double, + fitIntercept: Boolean, + featuresStd: Array[Double], + featuresMean: Array[Double], + effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] { + + override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { + val w = Vectors.fromBreeze(weights) + + val leastSquaresAggregator = data.treeAggregate(new LeastSquaresAggregator(w, labelStd, + labelMean, fitIntercept, featuresStd, featuresMean))( + seqOp = (c, v) => (c, v) match { + case (aggregator, (label, features)) => aggregator.add(label, features) + }, + combOp = (c1, c2) => (c1, c2) match { + case (aggregator1, aggregator2) => aggregator1.merge(aggregator2) + }) + + val k = 1.345 + val bcW = data.context.broadcast(w) + val diff = dot(bcW.value, w) - labelMean + val norm = brzNorm(weights, 2.0) + var regVal = 0.0 + if(diff < -k){ + regVal = -k * 0.5 * effectiveL2regParam * diff - 0.5 * k * k + } else if (diff >= -k && diff <= k){ + regVal = 0.25 * effectiveL2regParam * norm * norm + } else { + regVal = k * 0.5 * effectiveL2regParam * diff - 0.5 * k * k + } + + val loss = leastSquaresAggregator.loss + regVal + val gradient = leastSquaresAggregator.gradient + axpy(effectiveL2regParam, w, gradient) + + (loss, gradient.toBreeze.asInstanceOf[BDV[Double]]) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index 7cdda3db88ad1..190284621e3a6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -373,4 +373,321 @@ class LinearRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { .forall { case (Row(r1: Double), Row(r2: Double)) => r1 ~== r2 relTol 1E-5 } } + test("Robust params") { + ParamsSuite.checkParams(new RobustRegression) + val model = new LinearRegressionModel("RobustReg", Vectors.dense(0.0), 0.0) + ParamsSuite.checkParams(model) + } + + test("Robust regression: default params") { + val lir = new RobustRegression + assert(lir.getLabelCol === "label") + assert(lir.getFeaturesCol === "features") + assert(lir.getPredictionCol === "prediction") + assert(lir.getRegParam === 0.0) + assert(lir.getElasticNetParam === 0.0) + assert(lir.getFitIntercept) + val model = lir.fit(dataset) + model.transform(dataset) + .select("label", "prediction") + .collect() + assert(model.getFeaturesCol === "features") + assert(model.getPredictionCol === "prediction") + assert(model.intercept !== 0.0) + assert(model.hasParent) + } + + test("Robust regression with intercept without regularization") { + val trainer = new RobustRegression + val model = trainer.fit(dataset) + + /* + Using the following R code to load the data and train the model using glmnet package. + + library("glmnet") + data <- read.csv("path", header=FALSE, stringsAsFactors=FALSE) + features <- as.matrix(data.frame(as.numeric(data$V2), as.numeric(data$V3))) + label <- as.numeric(data$V1) + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0, lambda = 0)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.300528 + as.numeric.data.V2. 4.701024 + as.numeric.data.V3. 7.198257 + */ + val interceptR = 6.298698 + val weightsR = Vectors.dense(4.700706, 7.199082) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights ~= weightsR relTol 1E-3) + + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept without regularization") { + val trainer = (new RobustRegression).setFitIntercept(false) + val model = trainer.fit(dataset) + val modelWithoutIntercept = trainer.fit(datasetWithoutIntercept) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0, lambda = 0, + intercept = FALSE)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data.V2. 6.995908 + as.numeric.data.V3. 5.275131 + */ + val weightsR = Vectors.dense(6.995908, 5.275131) + + assert(model.intercept ~== 0 absTol 1E-3) + assert(model.weights ~= weightsR relTol 1E-3) + /* + Then again with the data with no intercept: + > weightsWithoutIntercept + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data3.V2. 4.70011 + as.numeric.data3.V3. 7.19943 + */ + val weightsWithoutInterceptR = Vectors.dense(4.70011, 7.19943) + + assert(modelWithoutIntercept.intercept ~== 0 absTol 1E-3) + assert(modelWithoutIntercept.weights ~= weightsWithoutInterceptR relTol 1E-3) + } + + test("Robust regression with intercept with L1 regularization") { + val trainer = (new RobustRegression).setElasticNetParam(1.0).setRegParam(0.57) + val model = trainer.fit(dataset) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, lambda = 0.57)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.24300 + as.numeric.data.V2. 4.024821 + as.numeric.data.V3. 6.679841 + */ + val interceptR = 6.24300 + val weightsR = Vectors.dense(4.024821, 6.679841) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights ~= weightsR relTol 1E-3) + + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept with L1 regularization") { + val trainer = (new RobustRegression).setElasticNetParam(1.0).setRegParam(0.57) + .setFitIntercept(false) + val model = trainer.fit(dataset) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, lambda = 0.57, + intercept=FALSE)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data.V2. 6.299752 + as.numeric.data.V3. 4.772913 + */ + val interceptR = 0.0 + val weightsR = Vectors.dense(6.299752, 4.772913) + + assert(model.intercept ~== interceptR absTol 1E-5) + assert(model.weights ~= weightsR relTol 1E-3) + + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression with intercept with L2 regularization") { + val trainer = (new RobustRegression).setElasticNetParam(0.0).setRegParam(2.3) + val model = trainer.fit(dataset) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.0, lambda = 2.3)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.328062 + as.numeric.data.V2. 3.222034 + as.numeric.data.V3. 4.926260 + */ + val interceptR = 5.269376 + val weightsR = Vectors.dense(3.736216, 5.712356) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights ~= weightsR relTol 1E-3) + + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept with L2 regularization") { + val trainer = (new RobustRegression).setElasticNetParam(0.0).setRegParam(2.3) + .setFitIntercept(false) + val model = trainer.fit(dataset) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.0, lambda = 2.3, + intercept = FALSE)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data.V2. 5.522875 + as.numeric.data.V3. 4.214502 + */ + val interceptR = 0.0 + val weightsR = Vectors.dense(5.522875, 4.214502) + + assert(model.intercept ~== interceptR absTol 1E-3) + assert(model.weights ~== weightsR relTol 1E-3) + + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression with intercept with ElasticNet regularization") { + val trainer = (new RobustRegression).setElasticNetParam(0.3).setRegParam(1.6) + val model = trainer.fit(dataset) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, lambda = 1.6)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.324108 + as.numeric.data.V2. 3.168435 + as.numeric.data.V3. 5.200403 + */ + val interceptR = 5.696056 + val weightsR = Vectors.dense(3.670489, 6.001122) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights ~== weightsR relTol 1E-3) + + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept with ElasticNet regularization") { + val trainer = (new RobustRegression).setElasticNetParam(0.3).setRegParam(1.6) + .setFitIntercept(false) + val model = trainer.fit(dataset) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, lambda = 1.6, + intercept=FALSE)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.dataM.V2. 5.673348 + as.numeric.dataM.V3. 4.322251 + */ + val interceptR = 0.0 + val weightsR = Vectors.dense(5.673348, 4.322251) + + assert(model.intercept ~== interceptR absTol 1E-3) + assert(model.weights ~= weightsR relTol 1E-3) + + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression model training summary") { + val trainer = new RobustRegression + val model = trainer.fit(dataset) + + // Training results for the model should be available + assert(model.hasSummary) + + // Residuals in [[RobustRegressionResults]] should equal those manually computed + val expectedResiduals = dataset.select("features", "label") + .map { case Row(features: DenseVector, label: Double) => + val prediction = + features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept + label - prediction + } + .zip(model.summary.residuals.map(_.getDouble(0))) + .collect() + .foreach { case (manualResidual: Double, resultResidual: Double) => + assert(manualResidual ~== resultResidual relTol 1E-5) + } + + /* + Use the following R code to generate model training results. + + predictions <- predict(fit, newx=features) + residuals <- label - predictions + > mean(residuals^2) # MSE + [1] 0.009720325 + > mean(abs(residuals)) # MAD + [1] 0.07863206 + > cor(predictions, label)^2# r^2 + [,1] + s0 0.9998749 + */ + assert(model.summary.meanSquaredError ~== 0.00972035 relTol 1E-5) + assert(model.summary.meanAbsoluteError ~== 0.07863206 relTol 1E-5) + assert(model.summary.r2 ~== 0.9998749 relTol 1E-5) + + // Objective function should be monotonically decreasing for linear regression + assert( + model.summary + .objectiveHistory + .sliding(2) + .forall(x => x(0) >= x(1))) + } + + test("Robust regression model testset evaluation summary") { + val trainer = new RobustRegression + val model = trainer.fit(dataset) + + // Evaluating on training dataset should yield results summary equal to training summary + val testSummary = model.evaluate(dataset) + assert(model.summary.meanSquaredError ~== testSummary.meanSquaredError relTol 1E-5) + assert(model.summary.r2 ~== testSummary.r2 relTol 1E-5) + model.summary.residuals.select("residuals").collect() + .zip(testSummary.residuals.select("residuals").collect()) + .forall { case (Row(r1: Double), Row(r2: Double)) => r1 ~== r2 relTol 1E-5 } + } + }