From ff37c130534e6538f925fc4aacee54ade39ad364 Mon Sep 17 00:00:00 2001 From: Gang Bai Date: Fri, 27 Jun 2014 13:35:24 +0800 Subject: [PATCH] The implementations of Poission regression in mllib/regression. It includes 1)the gradient of the negative log-likelihood, 2)the implementation of PoissonRegressionModel, the generalized linear algorithm class which uses L-BFGS and SGD for parameter estimation respectively, 3) the test suites for the gradient/loss computation, the regression method on generated and real-world data set, and 4) a Poisson regression data generator in mllib/util for producing the test data. --- .../spark/mllib/optimization/Gradient.scala | 34 ++ .../mllib/regression/PoissonRegression.scala | 283 ++++++++++++++++ .../util/PoissonRegressionDataGenerator.scala | 84 +++++ .../regression/PoissonRegressionSuite.scala | 314 ++++++++++++++++++ 4 files changed, 715 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/regression/PoissonRegression.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/util/PoissonRegressionDataGenerator.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/regression/PoissonRegressionSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 679842f831c2a..d1f3135fcef80 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -175,3 +175,37 @@ class HingeGradient extends Gradient { } } } + +/** + * :: DeveloperApi :: + * Poisson Negative Log-Likelihood Gradient. + * Compute the gradient and loss for the negative log-likelihood as used in Poisson regression. + * The log-likelihood is: + * \ell(\theta\mid X,Y) = \sum_{i=1}^m \left( y_i \theta' x_i - e^{\theta' x_i}\right). + * The gradient is: -\sum_{i=1}^m theta' \left( e^{theta' x_i} - y_i \right). + */ +@DeveloperApi +class PoissonNLLGradient extends Gradient { + override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { + val brzData = data.toBreeze + val brzWeights = weights.toBreeze + val dotProduct = brzWeights dot brzData + val mean = math.exp(dotProduct) + val grad = Vectors.fromBreeze(brzData * (mean - label)) + val loss = mean - label * dotProduct + (grad, loss) + } + + override def compute( + data: Vector, + label: Double, + weights: Vector, + cumGradient: Vector): Double = { + val brzData = data.toBreeze + val brzWeights = weights.toBreeze + val dotProduct = brzWeights dot brzData + val mean = math.exp(dotProduct) + brzAxpy(mean - label, brzData, cumGradient.toBreeze) + mean - label * dotProduct + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/PoissonRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/PoissonRegression.scala new file mode 100644 index 0000000000000..fe407e592529d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/PoissonRegression.scala @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.optimization._ +import org.apache.spark.mllib.linalg.Vector + +/** + * Log-linear model for count data under the assumption of a Poisson error structure. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class PoissonRegressionModel private[mllib] ( + override val weights: Vector, + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with RegressionModel with Serializable { + + override protected def predictPoint( + dataMatrix: Vector, + weightMatrix: Vector, + intercept: Double): Double = { + math.exp(weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept) + } + + override def toString = { + "Log-linear model: (" + weights.toString + ", " + intercept + ")" + } +} + +class PoissonRegressionWithLBFGS private ( + private var numCorrections: Int, + private var numIters: Int, + private var convergenceTol: Double, + private var regParam: Double) + extends GeneralizedLinearAlgorithm[PoissonRegressionModel] with Serializable { + + private val gradient = new PoissonNLLGradient() + private val updater = new SimpleUpdater() + + override val optimizer = new LBFGS(gradient, updater) + .setNumCorrections(numCorrections) + .setConvergenceTol(convergenceTol) + .setMaxNumIterations(numIters) + .setRegParam(regParam) + + def this() = this(10, 100, 0.1, 1.0) + + override protected def createModel(weights: Vector, intercept: Double) = { + new PoissonRegressionModel(weights, intercept) + } +} + +/** + * Entry for calling Poisson regression. + */ +object PoissonRegressionWithLBFGS { + + /** + * Train a PoissonRegression model given an RDD of (label, features) pairs. We run a L-BFGS to + * estimate the weights. The weights are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations The maximum number of iterations carried out by L-BFGS. + * @param numCorrections Specific parameter for LBFGS. + * @param convergTol The convergence tolerance of iterations for L-BFGS. + * @param regParam The regularization parameter for L-BFGS. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + numCorrections: Int, + convergTol: Double, + regParam: Double, + initialWeights: Vector): PoissonRegressionModel = { + new PoissonRegressionWithLBFGS( + numCorrections, + numIterations, + convergTol, + regParam) + .setIntercept(true) + .run(input, initialWeights) + } + + /** + * Train a PoissonRegression model given an RDD of (label, features) pairs. We run a L-BFGS to + * estimate the weights. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations The maximum number of iterations carried out by L-BFGS. + * @param numCorrections Specific parameter for LBFGS. + * @param convergTol The convergence tolerance of iterations for L-BFGS. + * @param regParam The regularization parameter for L-BFGS. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + numCorrections: Int, + convergTol: Double, + regParam: Double): PoissonRegressionModel = { + new PoissonRegressionWithLBFGS( + numCorrections, + numIterations, + convergTol, + regParam) + .setIntercept(true) + .run(input) + } + + /** + * Train a PoissonRegression model given an RDD of (label, features) pairs. We run a L-BFGS to + * estimate the weights. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations The maximum number of iterations carried out by L-BFGS. + * @param numCorrections Specific parameter for LBFGS. + * @param convergTol The convergence tolerance of iterations for L-BFGS. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + numCorrections: Int, + convergTol: Double): PoissonRegressionModel = { + train(input, numIterations, numCorrections, convergTol, 0.0) + } + + /** + * Train a PoissonRegression model given an RDD of (label, features) pairs. We run a L-BFGS to + * estimate the weights. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations The maximum number of iterations carried out by L-BFGS. + * @param numCorrections Specific parameter for LBFGS. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + numCorrections: Int): PoissonRegressionModel = { + train(input, numIterations, numCorrections, 1e-4, 0.0) + } + + /** + * Train a PoissonRegression model given an RDD of (label, features) pairs. We run a L-BFGS to + * estimate the weights. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations The maximum number of iterations carried out by L-BFGS. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int): PoissonRegressionModel = { + train(input, numIterations, 10, 1e-4, 0.0) + } +} + +class PoissonRegressionWithSGD private ( + private var stepSize: Double, + private var numIterations: Int, + private var regParam: Double, + private var miniBatchFraction: Double) + extends GeneralizedLinearAlgorithm[PoissonRegressionModel] with Serializable { + private var gradient = new PoissonNLLGradient() + private var updater = new SimpleUpdater() + override val optimizer = new GradientDescent(gradient, updater) + .setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) + + def this() = this(0.01, 100, 0.0, 1.0) + + override protected def createModel(weights: Vector, intercept: Double) = { + new PoissonRegressionModel(weights, intercept) + } +} + +object PoissonRegressionWithSGD { + + /** + * Train a PoissonRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used + * in gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + * @return a PoissonRegressionModel which has the weights and intercept from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + initialWeights: Vector): PoissonRegressionModel = { + new PoissonRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction) + .setIntercept(true) + .run(input, initialWeights) + } + /** + * Train a PoissonRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @return a PoissonRegressionModel which has the weights and intercept from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double): PoissonRegressionModel = { + new PoissonRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction) + .setIntercept(true).run(input) + } + + /** + * Train a PoissonRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to + * compute the true gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param numIterations Number of iterations of gradient descent to run. + * @return a PoissonRegressionModel which has the weights and intercept from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double): PoissonRegressionModel = { + train(input, numIterations, stepSize, 1.0) + } + + /** + * Train a PoissonRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to + * compute the true gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @return a PoissonRegressionModel which has the weights and intercept from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int): PoissonRegressionModel = { + train(input, numIterations, 1.0, 1.0) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/PoissonRegressionDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/PoissonRegressionDataGenerator.scala new file mode 100644 index 0000000000000..cfb473525ea1d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/PoissonRegressionDataGenerator.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import scala.util.Random + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.linalg.Vectors + +@DeveloperApi +object PoissonRegressionDataGenerator { + + /** + * Generate an RDD containing the sample data for PoissonRegression. + * + * We first randomly choose the parameters for the Poisson model, then generate + * a series of samples and the corresponding labels. + * + * @param sc SparkContext to use for creating the RDD. + * @param numExamples Number of examples that will be contained in the RDD. + * @param numFeatures Number of features to generate for each example. + * @param useIntercept Whether to use interception in the underlying parameters. + * @param numParts Number of partitions of the generated RDD. Default value is 2. + */ + def generatePoissonRegRDD( + sc: SparkContext, + numExamples: Int, + numFeatures: Int, + useIntercept: Boolean, + numParts: Int = 2): RDD[LabeledPoint] = { + val rnd = new Random(100083) + + // the underlying possion regression paramters + val parameters = Vectors dense Array.fill[Double](numFeatures)(rnd.nextDouble()) + val intercept = if (useIntercept) rnd.nextGaussian() else 0.0 + + // generate the data set + sc.parallelize(0 until numExamples, numParts) map { idx => + val rnd = new Random(32 + idx) + val x = Vectors dense Array.fill[Double](numFeatures)(rnd.nextDouble() * 4.0) + val y = math.exp(parameters.toBreeze dot x.toBreeze + intercept) + LabeledPoint(math rint y, x) + } + } + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: PoissonRegressionGenerator " + + " ") + System.exit(1) + } + + val sparkMaster: String = args(0) + val outputPath: String = args(1) + val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 + val nfeatures: Int = if (args.length > 3) args(3).toInt else 2 + val parts: Int = if (args.length > 4) args(4).toInt else 2 + + val sc = new SparkContext(sparkMaster, "PoissonRegressionDataGenerator") + val data = generatePoissonRegRDD(sc, nexamples, nfeatures, false, parts) + + data.saveAsTextFile(outputPath) + sc.stop() + } +} + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/PoissonRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/PoissonRegressionSuite.scala new file mode 100644 index 0000000000000..c23d474bfbf97 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/PoissonRegressionSuite.scala @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.scalatest.FunSuite + +import org.jblas.DoubleMatrix + +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.optimization.PoissonNLLGradient +import org.apache.spark.mllib.optimization._ +import org.apache.spark.mllib.util.{PoissonRegressionDataGenerator, LocalSparkContext} + +class PoissonRegressionSuite + extends FunSuite + with LocalSparkContext { + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint], tolerance: Double) = { + val sumSquaredError = (predictions zip input).foldLeft[Double](0.0) { + case (sum, (predict: Double, labeledPoint: LabeledPoint)) => + sum + (predict - labeledPoint.label) * (predict - labeledPoint.label) + } + val measSquaredError = sumSquaredError / predictions.length + assert(measSquaredError < tolerance) + } + + lazy val realDataSet = { + sc.parallelize(Array( + "(8,[0,1,0,0,0,0,1,28.3,3.05])", + "(4,[0,0,1,0,0,0,1,26.0,2.60])", + "(0,[0,0,1,0,0,0,1,25.6,2.15])", + "(0,[0,0,0,1,0,1,0,21.0,1.85])", + "(1,[0,1,0,0,0,0,1,29.0,3.00])", + "(3,[0,0,0,1,0,1,0,25.0,2.30])", + "(0,[0,0,0,1,0,0,1,26.2,1.30])", + "(0,[0,1,0,0,0,0,1,24.9,2.10])", + "(8,[0,1,0,0,0,0,1,25.7,2.00])", + "(6,[0,1,0,0,0,0,1,27.5,3.15])", + "(5,[0,0,0,1,0,0,1,26.1,2.80])", + "(4,[0,0,1,0,0,0,1,28.9,2.80])", + "(3,[0,1,0,0,0,0,1,30.3,3.60])", + "(4,[0,1,0,0,0,0,1,22.9,1.60])", + "(3,[0,0,1,0,0,0,1,26.2,2.30])", + "(5,[0,0,1,0,0,0,1,24.5,2.05])", + "(8,[0,1,0,0,0,0,1,30.0,3.05])", + "(3,[0,1,0,0,0,0,1,26.2,2.40])", + "(6,[0,1,0,0,0,0,1,25.4,2.25])", + "(4,[0,1,0,0,0,0,1,25.4,2.25])", + "(0,[0,0,0,1,0,0,1,27.5,2.90])", + "(3,[0,0,0,1,0,0,1,27.0,2.25])", + "(0,[0,1,0,0,0,1,0,24.0,1.70])", + "(0,[0,1,0,0,0,0,1,28.7,3.20])", + "(1,[0,0,1,0,0,0,1,26.5,1.97])", + "(1,[0,1,0,0,0,0,1,24.5,1.60])", + "(1,[0,0,1,0,0,0,1,27.3,2.90])", + "(4,[0,1,0,0,0,0,1,26.5,2.30])", + "(2,[0,1,0,0,0,0,1,25.0,2.10])", + "(0,[0,0,1,0,0,0,1,22.0,1.40])", + "(2,[0,0,0,1,0,0,1,30.2,3.28])", + "(0,[0,1,0,0,0,1,0,25.4,2.30])", + "(6,[0,1,0,0,0,0,1,24.9,2.30])", + "(10,[0,0,0,1,0,0,1,25.8,2.25])", + "(5,[0,0,1,0,0,0,1,27.2,2.40])", + "(3,[0,1,0,0,0,0,1,30.5,3.32])", + "(8,[0,0,0,1,0,0,1,25.0,2.10])", + "(9,[0,1,0,0,0,0,1,30.0,3.00])", + "(0,[0,1,0,0,0,0,1,22.9,1.60])", + "(2,[0,1,0,0,0,0,1,23.9,1.85])", + "(3,[0,1,0,0,0,0,1,26.0,2.28])", + "(0,[0,1,0,0,0,0,1,25.8,2.20])", + "(4,[0,0,1,0,0,0,1,29.0,3.28])", + "(0,[0,0,0,1,0,0,1,26.5,2.35])", + "(0,[0,0,1,0,0,0,1,22.5,1.55])", + "(0,[0,1,0,0,0,0,1,23.8,2.10])", + "(0,[0,0,1,0,0,0,1,24.3,2.15])", + "(14,[0,1,0,0,0,0,1,26.0,2.30])", + "(0,[0,0,0,1,0,0,1,24.7,2.20])", + "(1,[0,1,0,0,0,0,1,22.5,1.60])", + "(3,[0,1,0,0,0,0,1,28.7,3.15])", + "(4,[0,0,0,1,0,0,1,29.3,3.20])", + "(5,[0,1,0,0,0,0,1,26.7,2.70])", + "(0,[0,0,0,1,0,0,1,23.4,1.90])", + "(6,[0,0,0,1,0,0,1,27.7,2.50])", + "(6,[0,1,0,0,0,0,1,28.2,2.60])", + "(5,[0,0,0,1,0,0,1,24.7,2.10])", + "(5,[0,1,0,0,0,0,1,25.7,2.00])", + "(0,[0,1,0,0,0,0,1,27.8,2.75])", + "(3,[0,0,1,0,0,0,1,27.0,2.45])", + "(10,[0,1,0,0,0,0,1,29.0,3.20])", + "(7,[0,0,1,0,0,0,1,25.6,2.80])", + "(0,[0,0,1,0,0,0,1,24.2,1.90])", + "(0,[0,0,1,0,0,0,1,25.7,1.20])", + "(0,[0,0,1,0,0,0,1,23.1,1.65])", + "(0,[0,1,0,0,0,0,1,28.5,3.05])", + "(5,[0,1,0,0,0,0,1,29.7,3.85])", + "(0,[0,0,1,0,0,0,1,23.1,1.55])", + "(1,[0,0,1,0,0,0,1,24.5,2.20])", + "(1,[0,1,0,0,0,0,1,27.5,2.55])", + "(1,[0,1,0,0,0,0,1,26.3,2.40])", + "(3,[0,1,0,0,0,0,1,27.8,3.25])", + "(2,[0,1,0,0,0,0,1,31.9,3.33])", + "(5,[0,1,0,0,0,0,1,25.0,2.40])", + "(0,[0,0,1,0,0,0,1,26.2,2.22])", + "(3,[0,0,1,0,0,0,1,28.4,3.20])", + "(6,[0,0,0,1,0,1,0,24.5,1.95])", + "(7,[0,1,0,0,0,0,1,27.9,3.05])", + "(6,[0,1,0,0,0,1,0,25.0,2.25])", + "(3,[0,0,1,0,0,0,1,29.0,2.92])", + "(4,[0,1,0,0,0,0,1,31.7,3.73])", + "(4,[0,1,0,0,0,0,1,27.6,2.85])", + "(0,[0,0,0,1,0,0,1,24.5,1.90])", + "(0,[0,0,1,0,0,0,1,23.8,1.80])", + "(8,[0,1,0,0,0,0,1,28.2,3.05])", + "(0,[0,0,1,0,0,0,1,24.1,1.80])", + "(0,[0,0,0,1,0,0,1,28.0,2.62])", + "(9,[0,0,0,1,0,0,1,26.0,2.30])", + "(0,[0,0,1,0,0,1,0,24.7,1.90])", + "(0,[0,1,0,0,0,0,1,25.8,2.65])", + "(8,[0,0,0,1,0,0,1,27.1,2.95])", + "(5,[0,1,0,0,0,0,1,27.4,2.70])", + "(2,[0,0,1,0,0,0,1,26.7,2.60])", + "(5,[0,1,0,0,0,0,1,26.8,2.70])", + "(0,[0,0,0,1,0,0,1,25.8,2.60])", + "(0,[0,0,0,1,0,0,1,23.7,1.85])", + "(6,[0,1,0,0,0,0,1,27.9,2.80])", + "(5,[0,1,0,0,0,0,1,30.0,3.30])", + "(4,[0,1,0,0,0,0,1,25.0,2.10])", + "(5,[0,1,0,0,0,0,1,27.7,2.90])", + "(15,[0,1,0,0,0,0,1,28.3,3.00])", + "(0,[0,0,0,1,0,0,1,25.5,2.25])", + "(5,[0,1,0,0,0,0,1,26.0,2.15])", + "(0,[0,1,0,0,0,0,1,26.2,2.40])", + "(1,[0,0,1,0,0,0,1,23.0,1.65])", + "(0,[0,1,0,0,0,1,0,22.9,1.60])", + "(5,[0,1,0,0,0,0,1,25.1,2.10])", + "(4,[0,0,1,0,0,0,1,25.9,2.55])", + "(0,[0,0,0,1,0,0,1,25.5,2.75])", + "(0,[0,1,0,0,0,0,1,26.8,2.55])", + "(1,[0,1,0,0,0,0,1,29.0,2.80])", + "(1,[0,0,1,0,0,0,1,28.5,3.00])", + "(4,[0,1,0,0,0,1,0,24.7,2.55])", + "(1,[0,1,0,0,0,0,1,29.0,3.10])", + "(6,[0,1,0,0,0,0,1,27.0,2.50])", + "(0,[0,0,0,1,0,0,1,23.7,1.80])", + "(6,[0,0,1,0,0,0,1,27.0,2.50])", + "(2,[0,1,0,0,0,0,1,24.2,1.65])", + "(4,[0,0,0,1,0,0,1,22.5,1.47])", + "(0,[0,1,0,0,0,0,1,25.1,1.80])", + "(0,[0,1,0,0,0,0,1,24.9,2.20])", + "(6,[0,1,0,0,0,0,1,27.5,2.63])", + "(0,[0,1,0,0,0,0,1,24.3,2.00])", + "(4,[0,1,0,0,0,0,1,29.5,3.02])", + "(0,[0,1,0,0,0,0,1,26.2,2.30])", + "(4,[0,1,0,0,0,0,1,24.7,1.95])", + "(4,[0,0,1,0,0,1,0,29.8,3.50])", + "(0,[0,0,0,1,0,0,1,25.7,2.15])", + "(2,[0,0,1,0,0,0,1,26.2,2.17])", + "(0,[0,0,0,1,0,0,1,27.0,2.63])", + "(0,[0,0,1,0,0,0,1,24.8,2.10])", + "(0,[0,1,0,0,0,0,1,23.7,1.95])", + "(11,[0,1,0,0,0,0,1,28.2,3.05])", + "(1,[0,1,0,0,0,0,1,25.2,2.00])", + "(4,[0,1,0,0,0,1,0,23.2,1.95])", + "(3,[0,0,0,1,0,0,1,25.8,2.00])", + "(0,[0,0,0,1,0,0,1,27.5,2.60])", + "(0,[0,1,0,0,0,1,0,25.7,2.00])", + "(0,[0,1,0,0,0,0,1,26.8,2.65])", + "(3,[0,0,1,0,0,0,1,27.5,3.10])", + "(9,[0,0,1,0,0,0,1,28.5,3.25])", + "(3,[0,1,0,0,0,0,1,28.5,3.00])", + "(6,[0,0,0,1,0,0,1,27.4,2.70])", + "(3,[0,1,0,0,0,0,1,27.2,2.70])", + "(0,[0,0,1,0,0,0,1,27.1,2.55])", + "(1,[0,1,0,0,0,0,1,28.0,2.80])", + "(0,[0,1,0,0,0,0,1,26.5,1.30])", + "(0,[0,0,1,0,0,0,1,23.0,1.80])", + "(3,[0,0,1,0,0,1,0,26.0,2.20])", + "(0,[0,0,1,0,0,1,0,24.5,2.25])", + "(0,[0,1,0,0,0,0,1,25.8,2.30])", + "(0,[0,0,0,1,0,0,1,23.5,1.90])", + "(0,[0,0,0,1,0,0,1,26.7,2.45])", + "(0,[0,0,1,0,0,0,1,25.5,2.25])", + "(1,[0,1,0,0,0,0,1,28.2,2.87])", + "(1,[0,1,0,0,0,0,1,25.2,2.00])", + "(2,[0,1,0,0,0,0,1,25.3,1.90])", + "(0,[0,0,1,0,0,0,1,25.7,2.10])", + "(12,[0,0,0,1,0,0,1,29.3,3.23])", + "(6,[0,0,1,0,0,0,1,23.8,1.80])", + "(3,[0,1,0,0,0,0,1,27.4,2.90])", + "(2,[0,1,0,0,0,0,1,26.2,2.02])", + "(4,[0,1,0,0,0,0,1,28.0,2.90])", + "(5,[0,1,0,0,0,0,1,28.4,3.10])", + "(7,[0,1,0,0,0,0,1,33.5,5.20])", + "(0,[0,1,0,0,0,0,1,25.8,2.40])", + "(10,[0,0,1,0,0,0,1,24.0,1.90])", + "(0,[0,1,0,0,0,0,1,23.1,2.00])", + "(0,[0,1,0,0,0,0,1,28.3,3.20])", + "(4,[0,1,0,0,0,0,1,26.5,2.35])", + "(7,[0,1,0,0,0,0,1,26.5,2.75])", + "(3,[0,0,1,0,0,0,1,26.1,2.75])", + "(0,[0,1,0,0,0,1,0,24.5,2.00])")).map(LabeledPointParser.parse).cache() + } + + test("Getting gradient and loss func value of a point data.") { + val weight = Vectors.dense(Array(1.0, 1.0, 1.0)) + val data = Vectors.dense(Array(1.0, 2.0, 3.0)) + val label = 10.0 + val gradient = new PoissonNLLGradient() + val (grad, loss) = gradient.compute(data, label, weight) + + assert(math.abs(loss - 343.4287934927351) < 1e-6) + assert(math.abs(grad(0) - 393.4287934927351) < 1e-6) + assert(math.abs(grad(1) - 786.8575869854702) < 1e-6) + assert(math.abs(grad(2) - 1180.286380478205) < 1e-6) + } + + test("The accumulative version of gradient and loss of log-linear likelihood.") { + val weight = Vectors.dense(Array(1.0, 1.0, 1.0)) + val data = Vectors.dense(Array(1.0, 2.0, 3.0)) + val label = 10.0 + val grad = Vectors.dense(Array(0.0, 0.0, 0.0)) + val gradient = new PoissonNLLGradient() + + val loss = gradient.compute(data, label, weight, grad) + + assert(math.abs(loss - 343.4287934927351) < 1e-6) + assert(math.abs(grad(0) - 393.4287934927351) < 1e-6) + assert(math.abs(grad(1) - 786.8575869854702) < 1e-6) + assert(math.abs(grad(2) - 1180.286380478205) < 1e-6) + } + + test("Modeling generated count data with Poisson regression using L-BFGS.") { + val numFeatures = 8 + val generatedDataSet = PoissonRegressionDataGenerator + .generatePoissonRegRDD(sc, 500, numFeatures, true) + val posReg = new PoissonRegressionWithLBFGS().setIntercept(true); + posReg.optimizer + .setNumCorrections(10) + .setConvergenceTol(1e-6) + .setMaxNumIterations(50) + .setRegParam(0.0) + val prModel = posReg.run(generatedDataSet, Vectors.dense(new Array[Double](numFeatures))) + + val predicts = generatedDataSet map { labeledPoint => + math rint prModel.predict(labeledPoint.features) + } + validatePrediction(predicts.collect(), generatedDataSet.collect(), 1.0) + } + + test("Modeling generated count data with Poisson regression using SGD.") { + val numFeatures = 4 + val generatedDataSet = PoissonRegressionDataGenerator + .generatePoissonRegRDD(sc, 500, numFeatures, true) + val posReg = new PoissonRegressionWithSGD().setIntercept(true); + posReg.optimizer + .setNumIterations(800) + .setRegParam(0.0) + .setStepSize(1e-2) + .setMiniBatchFraction(0.8) + val prModel = posReg.run(generatedDataSet, Vectors.dense(Array.fill[Double](numFeatures)(0.5))) + + val predicts = generatedDataSet map { labeledPoint => + math rint prModel.predict(labeledPoint.features) + } + validatePrediction(predicts.collect(), generatedDataSet.collect(), 5.0) + } + + test("Modeling real-world count data with Poisson regression using L-BFGS.") { + val posReg = new PoissonRegressionWithLBFGS().setIntercept(true); + posReg.optimizer + .setNumCorrections(10) + .setConvergenceTol(1e-6) + .setMaxNumIterations(50) + .setRegParam(0.0) + val prModel = posReg.run(realDataSet, Vectors.dense(new Array[Double](9))) + + val predicts = realDataSet map { labeledPoint => + math rint prModel.predict(labeledPoint.features) + } + + validatePrediction(predicts.collect(), realDataSet.collect(), 10.0) + } + + test("Modeling real-world count data with Poisson regression using SGD.") { + val posReg = new PoissonRegressionWithSGD().setIntercept(true); + posReg.optimizer + .setNumIterations(1500) + .setRegParam(0.0) + .setStepSize(1e-3) + .setMiniBatchFraction(0.5) + val prModel = posReg.run(realDataSet, Vectors.dense(new Array[Double](9))) + + val predicts = realDataSet map { labeledPoint => + math rint prModel.predict(labeledPoint.features) + } + + validatePrediction(predicts.collect(), realDataSet.collect(), 10.0) + } +}