From e0eb199009cdf602bf4c96fc6c37f70560594d81 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Wed, 13 May 2015 17:29:31 +0200 Subject: [PATCH 01/10] Some refactoring for better generalization --- .../ml/optimization/GradientDescent.scala | 71 ++------------- .../apache/flink/ml/optimization/Solver.scala | 86 ++++++++++++++++++- 2 files changed, 89 insertions(+), 68 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index eff519e13cffd..a7c2b2e9a6ddd 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -43,12 +43,10 @@ import org.apache.flink.ml.optimization.Solver._ * [[IterativeSolver.Iterations]] for the maximum number of iteration, * [[IterativeSolver.Stepsize]] for the learning rate used. */ -class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { +class GradientDescent(runParameters: ParameterMap) extends IterativeSolver(runParameters) { import Solver.WEIGHTVECTOR_BROADCAST - var parameterMap: ParameterMap = parameters ++ runParameters - /** Performs one iteration of Stochastic Gradient Descent using mini batches * * @param data A Dataset of LabeledVector (label, features) pairs @@ -79,83 +77,26 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs - * @param initWeights The initial weights that will be optimized + * @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], - initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { - // TODO: Faster way to do this? - val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) - + initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { val numberOfIterations: Int = parameterMap(Iterations) // Initialize weights - val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) => { - wvDS.map{wv => { - val denseWeights = wv.weights match { - case dv: DenseVector => dv - case sv: SparseVector => sv.toDenseVector - } - WeightVector(denseWeights, wv.intercept) - } - - } - } - case None => createInitialWeightVector(dimensionsDS) - } + val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) // Perform the iterations // TODO: Enable convergence stopping criterion, as in Multiple Linear regression initialWeightsDS.iterate(numberOfIterations) { - weightVector => { - SGDStep(data, weightVector) + weightVectorDS => { + SGDStep(data, weightVectorDS) } } } - /** Mapping function that calculates the weight gradients from the data. - * - */ - private class GradientCalculation extends - RichMapFunction[LabeledVector, (WeightVector, Double, Int)] { - - var weightVector: WeightVector = null - - @throws(classOf[Exception]) - override def open(configuration: Configuration): Unit = { - val list = this.getRuntimeContext. - getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST) - - weightVector = list.get(0) - } - - override def map(example: LabeledVector): (WeightVector, Double, Int) = { - - val lossFunction = parameterMap(LossFunction) - val regType = parameterMap(RegularizationType) - val regParameter = parameterMap(RegularizationParameter) - val predictionFunction = parameterMap(PredictionFunctionParameter) - val dimensions = example.vector.size - // TODO(tvas): Any point in carrying the weightGradient vector for in-place replacement? - // The idea in spark is to avoid object creation, but here we have to do it anyway - val weightGradient = new DenseVector(new Array[Double](dimensions)) - - // TODO(tvas): Indentation here? - val (loss, lossDeriv) = lossFunction.lossAndGradient( - example, - weightVector, - weightGradient, - regType, - regParameter, - predictionFunction) - - (new WeightVector(weightGradient, lossDeriv), loss, 1) - } - } - /** Performs the update of the weights, according to the given gradients and regularization type. * */ diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala index 580e09673e9f6..a974fb9939136 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala @@ -18,9 +18,11 @@ package org.apache.flink.ml.optimization +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.DataSet +import org.apache.flink.configuration.Configuration import org.apache.flink.ml.common._ -import org.apache.flink.ml.math.{Vector => FlinkVector, BLAS, DenseVector} +import org.apache.flink.ml.math.{Vector => FlinkVector, SparseVector, BLAS, DenseVector} import org.apache.flink.api.scala._ import org.apache.flink.ml.optimization.IterativeSolver._ import org.apache.flink.ml.optimization.Solver._ @@ -28,7 +30,9 @@ import org.apache.flink.ml.optimization.Solver._ /** Base class for optimization algorithms * */ -abstract class Solver extends Serializable with WithParameters { +abstract class Solver(runParameters: ParameterMap) extends Serializable with WithParameters { + + var parameterMap: ParameterMap = parameters ++ runParameters /** Provides a solution for the given optimization problem * @@ -40,6 +44,32 @@ abstract class Solver extends Serializable with WithParameters { data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] + /** Creates initial weights vector, creating a DataSet with a WeightVector element + * + * @param initialWeights An Option that may contain an initial set of weights + * @param data The data for which we optimize the weights + * @return A DataSet containing a single WeightVector element + */ + def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]], + data: DataSet[LabeledVector]): DataSet[WeightVector] = { + // TODO: Faster way to do this? + val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) + + initialWeights match { + // Ensure provided weight vector is a DenseVector + case Some(wvDS) => + wvDS.map { wv => { + val denseWeights = wv.weights match { + case dv: DenseVector => dv + case sv: SparseVector => sv.toDenseVector + } + WeightVector(denseWeights, wv.intercept) + } + } + case None => createInitialWeightVector(dimensionsDS) + } + } + /** Creates a DataSet with one zero vector. The zero vector has dimension d, which is given * by the dimensionDS. * @@ -56,6 +86,7 @@ abstract class Solver extends Serializable with WithParameters { } //Setters for parameters + // TODO(tvas): Provide an option to fit an intercept or not def setLossFunction(lossFunction: LossFunction): Solver = { parameters.add(LossFunction, lossFunction) this @@ -106,7 +137,7 @@ object Solver { * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods on Wikipedia]] for more * info */ -abstract class IterativeSolver extends Solver { +abstract class IterativeSolver(runParameters: ParameterMap) extends Solver(runParameters) { //Setters for parameters def setIterations(iterations: Int): IterativeSolver = { @@ -118,6 +149,51 @@ abstract class IterativeSolver extends Solver { parameters.add(Stepsize, stepsize) this } + + def setConvergenceThreshold(convergenceThreshold: Double): IterativeSolver = { + parameters.add(ConvergenceThreshold, convergenceThreshold) + this + } + + /** Mapping function that calculates the weight gradients from the data. + * + */ + protected class GradientCalculation extends + RichMapFunction[LabeledVector, (WeightVector, Double, Int)] { + + var weightVector: WeightVector = null + + @throws(classOf[Exception]) + override def open(configuration: Configuration): Unit = { + val list = this.getRuntimeContext. + getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST) + + weightVector = list.get(0) + } + + override def map(example: LabeledVector): (WeightVector, Double, Int) = { + + val lossFunction = parameterMap(LossFunction) + val regType = parameterMap(RegularizationType) + val regParameter = parameterMap(RegularizationParameter) + val predictionFunction = parameterMap(PredictionFunctionParameter) + val dimensions = example.vector.size + // TODO(tvas): Any point in carrying the weightGradient vector for in-place replacement? + // The idea in spark is to avoid object creation, but here we have to do it anyway + val weightGradient = new DenseVector(new Array[Double](dimensions)) + + // TODO(tvas): Indentation here? + val (loss, lossDeriv) = lossFunction.lossAndGradient( + example, + weightVector, + weightGradient, + regType, + regParameter, + predictionFunction) + + (new WeightVector(weightGradient, lossDeriv), loss, 1) + } + } } object IterativeSolver { @@ -132,4 +208,8 @@ object IterativeSolver { case object Iterations extends Parameter[Int] { val defaultValue = Some(10) } + + case object ConvergenceThreshold extends Parameter[Double] { + val defaultValue = None + } } From c224cf9ecb35147175f3de4b188c03168f03fa1f Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Fri, 15 May 2015 22:11:48 +0200 Subject: [PATCH 02/10] Optimization refactoring Removed runParameters, to avoid having to maintain two parameter maps. Added apply methods. Some renaming to avoid confusion between parameters and actual types. --- .../ml/optimization/GradientDescent.scala | 24 +++--- .../flink/ml/optimization/LossFunction.scala | 6 ++ .../ml/optimization/Regularization.scala | 19 +++++ .../apache/flink/ml/optimization/Solver.scala | 31 ++++---- .../optimization/GradientDescentITSuite.scala | 75 ++++++++----------- 5 files changed, 80 insertions(+), 75 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index a7c2b2e9a6ddd..0ffd44eb428e3 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -36,14 +36,14 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: - * [[Solver.LossFunction]] for the loss function to be used, - * [[Solver.RegularizationType]] for the type of regularization, - * [[Solver.RegularizationParameter]] for the regularization parameter, + * The parameters to tune the algorithm are: + * [[Solver.LossFunctionParameter]] for the loss function to be used, + * [[Solver.RegularizationTypeParameter]] for the type of regularization, + * [[Solver.RegularizationValueParameter]] for the regularization parameter, * [[IterativeSolver.Iterations]] for the maximum number of iteration, * [[IterativeSolver.Stepsize]] for the learning rate used. */ -class GradientDescent(runParameters: ParameterMap) extends IterativeSolver(runParameters) { +class GradientDescent() extends IterativeSolver() { import Solver.WEIGHTVECTOR_BROADCAST @@ -83,7 +83,7 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver(runPa override def optimize( data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { - val numberOfIterations: Int = parameterMap(Iterations) + val numberOfIterations: Int = parameters(Iterations) // Initialize weights val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) @@ -114,9 +114,9 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver(runPa } override def map(gradientLossAndCount: (WeightVector, Double, Int)): WeightVector = { - val regType = parameterMap(RegularizationType) - val regParameter = parameterMap(RegularizationParameter) - val stepsize = parameterMap(Stepsize) + val regType = parameters(RegularizationTypeParameter) + val regParameter = parameters(RegularizationValueParameter) + val stepsize = parameters(Stepsize) val weightGradients = gradientLossAndCount._1 val lossSum = gradientLossAndCount._2 val count = gradientLossAndCount._3 @@ -172,11 +172,7 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver(runPa object GradientDescent { def apply(): GradientDescent = { - new GradientDescent(new ParameterMap()) - } - - def apply(parameterMap: ParameterMap): GradientDescent = { - new GradientDescent(parameterMap) + new GradientDescent() } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala index 1bb6152e51f82..2f25444b39164 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala @@ -117,3 +117,9 @@ class SquaredLoss extends RegressionLoss { } } + +object SquaredLoss { + def apply(): SquaredLoss = { + new SquaredLoss + } +} diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala index 4ec2452e5176e..8b2e5896ebf10 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala @@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization { regParameter: Double) } +// TODO(tvas): I think NoRegularization should extend DiffRegularization /** Performs no regularization, equivalent to $R(w) = 0$ **/ class NoRegularization extends Regularization { /** Adds the regularization term to the loss value @@ -110,6 +111,12 @@ class NoRegularization extends Regularization { regParameter: Double): Double = {loss} } +object NoRegularization { + def apply(): NoRegularization = { + new NoRegularization + } +} + /** $L_2$ regularization penalty. * * Penalizes large weights, favoring solutions with more small weights rather than few large ones. @@ -143,6 +150,12 @@ class L2Regularization extends DiffRegularization { } } +object L2Regularization { + def apply(): L2Regularization = { + new L2Regularization + } +} + /** $L_1$ regularization penalty. * * The $L_1$ penalty can be used to drive a number of the solution coefficients to 0, thereby @@ -196,3 +209,9 @@ class L1Regularization extends Regularization { vector.valueIterator.fold(0.0){(a,b) => math.abs(a) + math.abs(b)} } } + +object L1Regularization { + def apply(): L1Regularization = { + new L1Regularization + } +} diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala index a974fb9939136..5f673dbd4b6e8 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala @@ -25,14 +25,14 @@ import org.apache.flink.ml.common._ import org.apache.flink.ml.math.{Vector => FlinkVector, SparseVector, BLAS, DenseVector} import org.apache.flink.api.scala._ import org.apache.flink.ml.optimization.IterativeSolver._ +// TODO(tvas): Kind of ugly that we have to do this. Why not define the parameters inside the class? import org.apache.flink.ml.optimization.Solver._ /** Base class for optimization algorithms * */ -abstract class Solver(runParameters: ParameterMap) extends Serializable with WithParameters { +abstract class Solver() extends Serializable with WithParameters { - var parameterMap: ParameterMap = parameters ++ runParameters /** Provides a solution for the given optimization problem * @@ -88,17 +88,19 @@ abstract class Solver(runParameters: ParameterMap) extends Serializable with Wit //Setters for parameters // TODO(tvas): Provide an option to fit an intercept or not def setLossFunction(lossFunction: LossFunction): Solver = { - parameters.add(LossFunction, lossFunction) + parameters.add(LossFunctionParameter, lossFunction) this } + // TODO(tvas): Sanitize the input, i.e. depending on Solver type allow only certain types of + // regularization to be set. def setRegularizationType(regularization: Regularization): Solver = { - parameters.add(RegularizationType, regularization) + parameters.add(RegularizationTypeParameter, regularization) this } def setRegularizationParameter(regularizationParameter: Double): Solver = { - parameters.add(RegularizationParameter, regularizationParameter) + parameters.add(RegularizationValueParameter, regularizationParameter) this } @@ -113,17 +115,17 @@ object Solver { val WEIGHTVECTOR_BROADCAST = "weights_broadcast" // Define parameters for Solver - case object LossFunction extends Parameter[LossFunction] { + case object LossFunctionParameter extends Parameter[LossFunction] { // TODO(tvas): Should depend on problem, here is where differentiating between classification // and regression could become useful val defaultValue = Some(new SquaredLoss) } - case object RegularizationType extends Parameter[Regularization] { + case object RegularizationTypeParameter extends Parameter[Regularization] { val defaultValue = Some(new NoRegularization) } - case object RegularizationParameter extends Parameter[Double] { + case object RegularizationValueParameter extends Parameter[Double] { val defaultValue = Some(0.0) // TODO(tvas): Properly initialize this, ensure Parameter > 0! } @@ -137,7 +139,7 @@ object Solver { * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods on Wikipedia]] for more * info */ -abstract class IterativeSolver(runParameters: ParameterMap) extends Solver(runParameters) { +abstract class IterativeSolver() extends Solver() { //Setters for parameters def setIterations(iterations: Int): IterativeSolver = { @@ -173,16 +175,13 @@ abstract class IterativeSolver(runParameters: ParameterMap) extends Solver(runPa override def map(example: LabeledVector): (WeightVector, Double, Int) = { - val lossFunction = parameterMap(LossFunction) - val regType = parameterMap(RegularizationType) - val regParameter = parameterMap(RegularizationParameter) - val predictionFunction = parameterMap(PredictionFunctionParameter) + val lossFunction = parameters(LossFunctionParameter) + val regType = parameters(RegularizationTypeParameter) + val regParameter = parameters(RegularizationValueParameter) + val predictionFunction = parameters(PredictionFunctionParameter) val dimensions = example.vector.size - // TODO(tvas): Any point in carrying the weightGradient vector for in-place replacement? - // The idea in spark is to avoid object creation, but here we have to do it anyway val weightGradient = new DenseVector(new Array[Double](dimensions)) - // TODO(tvas): Indentation here? val (loss, lossDeriv) = lossFunction.lossAndGradient( example, weightVector, diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala index 2734419e573ce..a01dd3ec8af85 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala @@ -38,15 +38,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { env.setParallelism(2) - val parameters = ParameterMap() - - parameters.add(IterativeSolver.Stepsize, 0.01) - parameters.add(IterativeSolver.Iterations, 2000) - parameters.add(Solver.LossFunction, new SquaredLoss) - parameters.add(Solver.RegularizationType, new L1Regularization) - parameters.add(Solver.RegularizationParameter, 0.3) - - val sgd = GradientDescent(parameters) + val sgd = GradientDescent() + .setStepsize(0.01) + .setIterations(2000) + .setLossFunction(SquaredLoss()) + .setRegularizationType(L1Regularization()) + .setRegularizationParameter(0.3) val inputDS: DataSet[LabeledVector] = env.fromCollection(regularizationData) @@ -72,15 +69,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { env.setParallelism(2) - val parameters = ParameterMap() - - parameters.add(IterativeSolver.Stepsize, 0.1) - parameters.add(IterativeSolver.Iterations, 1) - parameters.add(Solver.LossFunction, new SquaredLoss) - parameters.add(Solver.RegularizationType, new L2Regularization) - parameters.add(Solver.RegularizationParameter, 1.0) - - val sgd = GradientDescent(parameters) + val sgd = GradientDescent() + .setStepsize(0.1) + .setIterations(1) + .setLossFunction(SquaredLoss()) + .setRegularizationType(L2Regularization()) + .setRegularizationParameter(1.0) val inputDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.0, DenseVector(2.0))) val currentWeights = new WeightVector(DenseVector(1.0), 1.0) @@ -106,15 +100,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { env.setParallelism(2) - val parameters = ParameterMap() - - parameters.add(IterativeSolver.Stepsize, 1.0) - parameters.add(IterativeSolver.Iterations, 800) - parameters.add(Solver.LossFunction, new SquaredLoss) - parameters.add(Solver.RegularizationType, new NoRegularization) - parameters.add(Solver.RegularizationParameter, 0.0) - - val sgd = GradientDescent(parameters) + val sgd = GradientDescent() + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) val inputDS = env.fromCollection(data) val weightDS = sgd.optimize(inputDS, None) @@ -141,15 +132,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { env.setParallelism(2) - val parameters = ParameterMap() - - parameters.add(IterativeSolver.Stepsize, 0.0001) - parameters.add(IterativeSolver.Iterations, 100) - parameters.add(Solver.LossFunction, new SquaredLoss) - parameters.add(Solver.RegularizationType, new NoRegularization) - parameters.add(Solver.RegularizationParameter, 0.0) - - val sgd = GradientDescent(parameters) + val sgd = GradientDescent() + .setStepsize(0.0001) + .setIterations(100) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) val inputDS = env.fromCollection(noInterceptData) val weightDS = sgd.optimize(inputDS, None) @@ -175,15 +163,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { env.setParallelism(2) - val parameters = ParameterMap() - - parameters.add(IterativeSolver.Stepsize, 0.1) - parameters.add(IterativeSolver.Iterations, 1) - parameters.add(Solver.LossFunction, new SquaredLoss) - parameters.add(Solver.RegularizationType, new NoRegularization) - parameters.add(Solver.RegularizationParameter, 0.0) - - val sgd = GradientDescent(parameters) + val sgd = GradientDescent() + .setStepsize(0.1) + .setIterations(1) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) val inputDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.0, DenseVector(2.0))) val currentWeights = new WeightVector(DenseVector(1.0), 1.0) From 25ae46bf7e0789d9f13479ef984c15d75526223a Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Mon, 18 May 2015 17:27:16 +0200 Subject: [PATCH 03/10] Removed redundant arguments from LossFunction.lossAndGradient --- .../org/apache/flink/ml/optimization/LossFunction.scala | 4 ++-- .../scala/org/apache/flink/ml/optimization/Solver.scala | 4 ---- .../apache/flink/ml/optimization/LossFunctionITSuite.scala | 7 +++++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala index 2f25444b39164..1b632dda5236e 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala @@ -51,6 +51,8 @@ abstract class LossFunction extends Serializable{ * @param example The features and the label associated with the example * @param weights The current weight vector * @param cumGradient The vector to which the gradient will be added to, in place. + * @param predictionFunction A [[PredictionFunction]] object which provides a way to calculate + * a prediction and its gradient from the features and weights * @return A tuple containing the computed loss as its first element and a the loss derivative as * its second element. The gradient is updated in-place. */ @@ -58,8 +60,6 @@ abstract class LossFunction extends Serializable{ example: LabeledVector, weights: WeightVector, cumGradient: FlinkVector, - regType: Regularization, - regParameter: Double, predictionFunction: PredictionFunction): (Double, Double) = { val features = example.vector diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala index 5f673dbd4b6e8..4eedb097a0ea7 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala @@ -176,8 +176,6 @@ abstract class IterativeSolver() extends Solver() { override def map(example: LabeledVector): (WeightVector, Double, Int) = { val lossFunction = parameters(LossFunctionParameter) - val regType = parameters(RegularizationTypeParameter) - val regParameter = parameters(RegularizationValueParameter) val predictionFunction = parameters(PredictionFunctionParameter) val dimensions = example.vector.size val weightGradient = new DenseVector(new Array[Double](dimensions)) @@ -186,8 +184,6 @@ abstract class IterativeSolver() extends Solver() { example, weightVector, weightGradient, - regType, - regParameter, predictionFunction) (new WeightVector(weightGradient, lossDeriv), loss, 1) diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala index e5509a39fbc98..7b102e0559adf 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala @@ -41,8 +41,11 @@ class LossFunctionITSuite extends FlatSpec with Matchers with FlinkTestBase { val weightVector = new WeightVector(DenseVector(1.0), 1.0) val gradient = DenseVector(0.0) - val (loss, lossDerivative) = squaredLoss.lossAndGradient(example, weightVector, gradient, new - NoRegularization, 0.0, new LinearPrediction) + val (loss, lossDerivative) = squaredLoss.lossAndGradient( + example, + weightVector, + gradient, + new LinearPrediction) loss should be (2.0 +- 0.001) From 1b81d8f16c1778c8950e72d8de0026eed6e085fd Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Mon, 18 May 2015 18:30:54 +0200 Subject: [PATCH 04/10] Frist draft version of convergence --- .../ml/optimization/GradientDescent.scala | 107 +++++++++++++++++- 1 file changed, 101 insertions(+), 6 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index 0ffd44eb428e3..d186d1345ecdd 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -19,12 +19,13 @@ package org.apache.flink.ml.optimization +import com.github.fommil.netlib.BLAS._ import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.ml.common._ import org.apache.flink.ml.math._ -import org.apache.flink.ml.optimization.IterativeSolver.{Iterations, Stepsize} +import org.apache.flink.ml.optimization.IterativeSolver.{ConvergenceThreshold, Iterations, Stepsize} import org.apache.flink.ml.optimization.Solver._ /** This [[Solver]] performs Stochastic Gradient Descent optimization using mini batches @@ -74,6 +75,8 @@ class GradientDescent() extends IterativeSolver() { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs @@ -84,16 +87,108 @@ class GradientDescent() extends IterativeSolver() { data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { val numberOfIterations: Int = parameters(Iterations) + // TODO(tvas): This looks out of place, why don't we get back an Option from + // parameters(ConvergenceThreshold)? + val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) // Perform the iterations - // TODO: Enable convergence stopping criterion, as in Multiple Linear regression - initialWeightsDS.iterate(numberOfIterations) { - weightVectorDS => { - SGDStep(data, weightVectorDS) - } + val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None => + initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS => { + SGDStep(data, weightVectorDS) + } + } + case Some(convergence) => + // we have to calculate for each weight vector the sum of squared residuals + val initialLossSumDS = data.map { + new LossCalculation + }.withBroadcastSet(initialWeightsDS, WEIGHTVECTOR_BROADCAST).reduce { + _ + _ + } + // TODO(tvas): Apply regularization to loss value + + // combine weight vector with current sum of squared residuals + val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + + val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum => + + // extract weight vector and squared residual sum + val currentWeightsDS = weightsWithLossSum.map{_._1} + val currentLossSumDS = weightsWithLossSum.map{_._2} + + val updatedWeightsDS = SGDStep(data, currentWeightsDS) + + val updatedLossSumDS = data.map { + new LossCalculation + }.withBroadcastSet(updatedWeightsDS, WEIGHTVECTOR_BROADCAST).reduce { + _ + _ + } + // TODO(tvas): Apply regularization to loss value + + // Check if the relative change in the squared residual sum is smaller than the + // convergence threshold. If yes, then terminate => return empty termination data set + val termination = currentLossSumDS.crossWithTiny(updatedLossSumDS).setParallelism(1). + filter{ + pair => { + val (loss, newLoss) = pair + + if (loss <= 0) { + false + } else { + math.abs((loss - newLoss)/loss) >= convergence + } + } + } + + // result for new iteration + (updatedWeightsDS cross updatedLossSumDS, termination) + } + resultWithLoss.map{_._1} + } + optimizedWeights + } + + /** Calculates the loss value, given a labeled vector and the current weight vector + * + * The weight vector is received as a broadcast variable. + */ + private class LossCalculation extends RichMapFunction[LabeledVector, Double] { + + var weightVector: WeightVector = null + + + @throws(classOf[Exception]) + override def open(configuration: Configuration): Unit = { + val list = this.getRuntimeContext. + getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST) + + weightVector = list.get(0) + + } + + override def map(example: LabeledVector): Double = { + val lossFunction = parameters(LossFunctionParameter) + val predictionFunction = parameters(PredictionFunctionParameter) + val dimensions = example.vector.size + // TODO(tvas): Avoid needless creation of WeightGradient object + // Create a lossValue function in LossFunction? + val weightGradient = new DenseVector(new Array[Double](dimensions)) + + val (loss, _) = lossFunction.lossAndGradient( + example, + weightVector, + weightGradient, + predictionFunction) + + loss } } From 99307b2bb8573d3f1366e6675f2feb91a91d3b5c Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Tue, 19 May 2015 11:19:58 +0200 Subject: [PATCH 05/10] Added test case for convergence --- .../ml/optimization/GradientDescent.scala | 20 +++---- .../optimization/GradientDescentITSuite.scala | 57 ++++++++++++++++++- 2 files changed, 66 insertions(+), 11 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index d186d1345ecdd..a6423ddc001da 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -121,35 +121,35 @@ class GradientDescent() extends IterativeSolver() { weightsWithLossSum => // extract weight vector and squared residual sum - val currentWeightsDS = weightsWithLossSum.map{_._1} - val currentLossSumDS = weightsWithLossSum.map{_._2} + val previousWeightsDS = weightsWithLossSum.map{_._1} + val previousLossSumDS = weightsWithLossSum.map{_._2} - val updatedWeightsDS = SGDStep(data, currentWeightsDS) + val currentWeightsDS = SGDStep(data, previousWeightsDS) - val updatedLossSumDS = data.map { + val currentLossSumDS = data.map { new LossCalculation - }.withBroadcastSet(updatedWeightsDS, WEIGHTVECTOR_BROADCAST).reduce { + }.withBroadcastSet(currentWeightsDS, WEIGHTVECTOR_BROADCAST).reduce { _ + _ } // TODO(tvas): Apply regularization to loss value // Check if the relative change in the squared residual sum is smaller than the // convergence threshold. If yes, then terminate => return empty termination data set - val termination = currentLossSumDS.crossWithTiny(updatedLossSumDS).setParallelism(1). + val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1). filter{ pair => { - val (loss, newLoss) = pair + val (previousLoss, currentLoss) = pair - if (loss <= 0) { + if (previousLoss <= 0) { false } else { - math.abs((loss - newLoss)/loss) >= convergence + math.abs((previousLoss - currentLoss)/previousLoss) >= convergence } } } // result for new iteration - (updatedWeightsDS cross updatedLossSumDS, termination) + (currentWeightsDS cross currentLossSumDS, termination) } resultWithLoss.map{_._1} } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala index a01dd3ec8af85..52180313fe8f6 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala @@ -190,6 +190,61 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { } - // TODO: Need more corner cases + it should "terminate early if the convergence criterion is reached" in { + // TODO(tvas): We need a better way to check the convergence of the weights. + // Ideally we want to have a Breeze-like system, where the optimizers carry a history and that + // can tell us whether we have converged and at which iteration + + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val sgdEarlyTerminate = GradientDescent() + .setConvergenceThreshold(1e2) + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) + + val inputDS = env.fromCollection(data) + + val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None) + + val weightListEarly: Seq[WeightVector] = weightDSEarlyTerminate.collect() + + weightListEarly.size should equal(1) + + val weightVectorEarly: WeightVector = weightListEarly.head + val weightsEarly = weightVectorEarly.weights.asInstanceOf[DenseVector].data + val weight0Early = weightVectorEarly.intercept + + val sgdNoConvergence = GradientDescent() + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) + + + val weightDSNoConvergence = sgdNoConvergence.optimize(inputDS, None) + + val weightListNoConvergence: Seq[WeightVector] = weightDSNoConvergence.collect() + + weightListNoConvergence.size should equal(1) + + val weightVectorNoConvergence: WeightVector = weightListNoConvergence.head + val weightsNoConvergence = weightVectorNoConvergence.weights.asInstanceOf[DenseVector].data + val weight0NoConvergence = weightVectorNoConvergence.intercept + + // Since the first optimizer was set to terminate early, its weights should be different + weightsEarly zip weightsNoConvergence foreach { + case (earlyWeight, weightNoConvergence) => + weightNoConvergence should not be (earlyWeight +- 0.1) + } + weight0NoConvergence should not be (weight0Early +- 0.1) + } + + // TODO: Need more corner cases, see sklearn tests for SGD linear model } From b0a462046a2db7b7d524c5bbaf55ca5f79cc6a7f Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Tue, 19 May 2015 13:52:26 +0200 Subject: [PATCH 06/10] Added regularization to calculated loss. --- .../ml/optimization/GradientDescent.scala | 84 ++++++++++++++----- 1 file changed, 63 insertions(+), 21 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index a6423ddc001da..2ba1b8e1d76ea 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -19,7 +19,6 @@ package org.apache.flink.ml.optimization -import com.github.fommil.netlib.BLAS._ import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration @@ -43,6 +42,9 @@ import org.apache.flink.ml.optimization.Solver._ * [[Solver.RegularizationValueParameter]] for the regularization parameter, * [[IterativeSolver.Iterations]] for the maximum number of iteration, * [[IterativeSolver.Stepsize]] for the learning rate used. + * [[IterativeSolver.ConvergenceThreshold]] when provided the algorithm will + * stop the iterations if the change in the value of the objective + * function between successive iterations is is smaller than this value. */ class GradientDescent() extends IterativeSolver() { @@ -104,15 +106,26 @@ class GradientDescent() extends IterativeSolver() { } } case Some(convergence) => - // we have to calculate for each weight vector the sum of squared residuals - val initialLossSumDS = data.map { - new LossCalculation - }.withBroadcastSet(initialWeightsDS, WEIGHTVECTOR_BROADCAST).reduce { - _ + _ + /** Calculates the regularized loss, from the data and given weights **/ + def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): + DataSet[Double] = { + data.map { + new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) + .reduce { + (left, right) => + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } + .map{new RegularizedLossCalculation} + .withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - // TODO(tvas): Apply regularization to loss value + // We have to calculate for each weight vector the sum of squared residuals, + // and then sum them and apply regularization + val initialLossSumDS = lossCalculation(data, initialWeightsDS) - // combine weight vector with current sum of squared residuals + // Combine weight vector with the current loss val initialWeightsWithLossSum = initialWeightsDS. crossWithTiny(initialLossSumDS).setParallelism(1) @@ -120,21 +133,16 @@ class GradientDescent() extends IterativeSolver() { iterateWithTermination(numberOfIterations) { weightsWithLossSum => - // extract weight vector and squared residual sum + // Extract weight vector and loss val previousWeightsDS = weightsWithLossSum.map{_._1} val previousLossSumDS = weightsWithLossSum.map{_._2} val currentWeightsDS = SGDStep(data, previousWeightsDS) - val currentLossSumDS = data.map { - new LossCalculation - }.withBroadcastSet(currentWeightsDS, WEIGHTVECTOR_BROADCAST).reduce { - _ + _ - } - // TODO(tvas): Apply regularization to loss value + val currentLossSumDS = lossCalculation(data, currentWeightsDS) - // Check if the relative change in the squared residual sum is smaller than the - // convergence threshold. If yes, then terminate => return empty termination data set + // Check if the relative change in the loss is smaller than the + // convergence threshold. If yes, then terminate i.e. return empty termination data set val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1). filter{ pair => { @@ -148,9 +156,10 @@ class GradientDescent() extends IterativeSolver() { } } - // result for new iteration + // Result for new iteration (currentWeightsDS cross currentLossSumDS, termination) } + // Return just the weights resultWithLoss.map{_._1} } optimizedWeights @@ -160,7 +169,7 @@ class GradientDescent() extends IterativeSolver() { * * The weight vector is received as a broadcast variable. */ - private class LossCalculation extends RichMapFunction[LabeledVector, Double] { + private class LossCalculation extends RichMapFunction[LabeledVector, (Double, Int)] { var weightVector: WeightVector = null @@ -174,7 +183,7 @@ class GradientDescent() extends IterativeSolver() { } - override def map(example: LabeledVector): Double = { + override def map(example: LabeledVector): (Double, Int) = { val lossFunction = parameters(LossFunctionParameter) val predictionFunction = parameters(PredictionFunctionParameter) val dimensions = example.vector.size @@ -188,9 +197,42 @@ class GradientDescent() extends IterativeSolver() { weightGradient, predictionFunction) - loss + (loss, 1) + } + } + +/** Calculates the regularized loss value, given the loss and the current weight vector + * + * The weight vector is received as a broadcast variable. + */ +private class RegularizedLossCalculation extends RichMapFunction[(Double, Int), Double] { + + var weightVector: WeightVector = null + + + @throws(classOf[Exception]) + override def open(configuration: Configuration): Unit = { + val list = this.getRuntimeContext. + getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST) + + weightVector = list.get(0) + + } + + override def map(lossAndCount: (Double, Int)): Double = { + val (lossSum, count) = lossAndCount + val regType = parameters(RegularizationTypeParameter) + val regParameter = parameters(RegularizationValueParameter) + + val regularizedLoss = { + regType.regLoss( + lossSum/count, + weightVector.weights, + regParameter) } + regularizedLoss } +} /** Performs the update of the weights, according to the given gradients and regularization type. * From 1ab94d7aac45f16392944f379b59c0dfeadcb0a4 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Tue, 19 May 2015 16:24:02 +0200 Subject: [PATCH 07/10] Changed parameter setting functions to return this.type Also fixed wording of docstring. --- .../flink/ml/optimization/GradientDescent.scala | 2 +- .../apache/flink/ml/optimization/Solver.scala | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index 2ba1b8e1d76ea..079926ac4fa01 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -43,7 +43,7 @@ import org.apache.flink.ml.optimization.Solver._ * [[IterativeSolver.Iterations]] for the maximum number of iteration, * [[IterativeSolver.Stepsize]] for the learning rate used. * [[IterativeSolver.ConvergenceThreshold]] when provided the algorithm will - * stop the iterations if the change in the value of the objective + * stop the iterations if the relative change in the value of the objective * function between successive iterations is is smaller than this value. */ class GradientDescent() extends IterativeSolver() { diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala index 4eedb097a0ea7..b3a1486e3cd5c 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.DataSet import org.apache.flink.configuration.Configuration import org.apache.flink.ml.common._ -import org.apache.flink.ml.math.{Vector => FlinkVector, SparseVector, BLAS, DenseVector} +import org.apache.flink.ml.math.{SparseVector, DenseVector} import org.apache.flink.api.scala._ import org.apache.flink.ml.optimization.IterativeSolver._ // TODO(tvas): Kind of ugly that we have to do this. Why not define the parameters inside the class? @@ -87,24 +87,24 @@ abstract class Solver() extends Serializable with WithParameters { //Setters for parameters // TODO(tvas): Provide an option to fit an intercept or not - def setLossFunction(lossFunction: LossFunction): Solver = { + def setLossFunction(lossFunction: LossFunction): this.type = { parameters.add(LossFunctionParameter, lossFunction) this } // TODO(tvas): Sanitize the input, i.e. depending on Solver type allow only certain types of // regularization to be set. - def setRegularizationType(regularization: Regularization): Solver = { + def setRegularizationType(regularization: Regularization): this.type = { parameters.add(RegularizationTypeParameter, regularization) this } - def setRegularizationParameter(regularizationParameter: Double): Solver = { + def setRegularizationParameter(regularizationParameter: Double): this.type = { parameters.add(RegularizationValueParameter, regularizationParameter) this } - def setPredictionFunction(predictionFunction: PredictionFunction): Solver = { + def setPredictionFunction(predictionFunction: PredictionFunction): this.type = { parameters.add(PredictionFunctionParameter, predictionFunction) this } @@ -142,17 +142,17 @@ object Solver { abstract class IterativeSolver() extends Solver() { //Setters for parameters - def setIterations(iterations: Int): IterativeSolver = { + def setIterations(iterations: Int): this.type = { parameters.add(Iterations, iterations) this } - def setStepsize(stepsize: Double): IterativeSolver = { + def setStepsize(stepsize: Double): this.type = { parameters.add(Stepsize, stepsize) this } - def setConvergenceThreshold(convergenceThreshold: Double): IterativeSolver = { + def setConvergenceThreshold(convergenceThreshold: Double): this.type = { parameters.add(ConvergenceThreshold, convergenceThreshold) this } From 3b075ef5ce56a454817e28457deeaaab58b4de9e Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Thu, 21 May 2015 11:28:48 +0200 Subject: [PATCH 08/10] Added lossValue function to LossFunction, and some style fixes --- .../ml/optimization/GradientDescent.scala | 32 +++++++------------ .../flink/ml/optimization/LossFunction.scala | 22 +++++++++++++ .../apache/flink/ml/optimization/Solver.scala | 6 ++-- .../optimization/GradientDescentITSuite.scala | 2 -- 4 files changed, 36 insertions(+), 26 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index 079926ac4fa01..bc7852a7f078d 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -46,7 +46,7 @@ import org.apache.flink.ml.optimization.Solver._ * stop the iterations if the relative change in the value of the objective * function between successive iterations is is smaller than this value. */ -class GradientDescent() extends IterativeSolver() { +class GradientDescent() extends IterativeSolver { import Solver.WEIGHTVECTOR_BROADCAST @@ -89,9 +89,7 @@ class GradientDescent() extends IterativeSolver() { data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { val numberOfIterations: Int = parameters(Iterations) - // TODO(tvas): This looks out of place, why don't we get back an Option from - // parameters(ConvergenceThreshold)? - val convergenceThresholdOption = parameters.get(ConvergenceThreshold) + val convergenceThresholdOption: Option[Double] = parameters.get(ConvergenceThreshold) // Initialize weights val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) @@ -109,17 +107,15 @@ class GradientDescent() extends IterativeSolver() { /** Calculates the regularized loss, from the data and given weights **/ def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): DataSet[Double] = { - data.map { - new LossCalculation - }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) + data + .map {new LossCalculation}.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) .reduce { - (left, right) => - val (leftLoss, leftCount) = left - val (rightLoss, rightCount) = right - (leftLoss + rightLoss, rightCount + leftCount) - } - .map{new RegularizedLossCalculation} - .withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) + (left, right) => + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } + .map{new RegularizedLossCalculation}.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } // We have to calculate for each weight vector the sum of squared residuals, // and then sum them and apply regularization @@ -187,14 +183,10 @@ class GradientDescent() extends IterativeSolver() { val lossFunction = parameters(LossFunctionParameter) val predictionFunction = parameters(PredictionFunctionParameter) val dimensions = example.vector.size - // TODO(tvas): Avoid needless creation of WeightGradient object - // Create a lossValue function in LossFunction? - val weightGradient = new DenseVector(new Array[Double](dimensions)) - val (loss, _) = lossFunction.lossAndGradient( + val loss = lossFunction.lossValue( example, weightVector, - weightGradient, predictionFunction) (loss, 1) @@ -209,14 +201,12 @@ private class RegularizedLossCalculation extends RichMapFunction[(Double, Int), var weightVector: WeightVector = null - @throws(classOf[Exception]) override def open(configuration: Configuration): Unit = { val list = this.getRuntimeContext. getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST) weightVector = list.get(0) - } override def map(lossAndCount: (Double, Int)): Double = { diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala index 1b632dda5236e..d612b90196b37 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala @@ -87,6 +87,28 @@ abstract class LossFunction extends Serializable{ BLAS.axpy(restrictedLossDeriv, predictionGradient, cumGradient) (lossValue, lossDeriv) } + + /** Compute the loss for the given data. + * + * @param example The features and the label associated with the example + * @param weights The current weight vector + * @param predictionFunction A [[PredictionFunction]] object which provides a way to calculate + * a prediction and its gradient from the features and weights + * @return The calculated loss value + */ + def lossValue( + example: LabeledVector, + weights: WeightVector, + predictionFunction: PredictionFunction): Double = { + val features = example.vector + val label = example.label + // TODO(tvas): We could also provide for the case where we don't want an intercept value + // i.e. data already centered + val prediction = predictionFunction.predict(features, weights) + val lossValue: Double = loss(prediction, label) + lossValue + } + } trait ClassificationLoss extends LossFunction diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala index b3a1486e3cd5c..b9ac1c2885186 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala @@ -139,7 +139,7 @@ object Solver { * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods on Wikipedia]] for more * info */ -abstract class IterativeSolver() extends Solver() { +abstract class IterativeSolver() extends Solver { //Setters for parameters def setIterations(iterations: Int): this.type = { @@ -160,8 +160,8 @@ abstract class IterativeSolver() extends Solver() { /** Mapping function that calculates the weight gradients from the data. * */ - protected class GradientCalculation extends - RichMapFunction[LabeledVector, (WeightVector, Double, Int)] { + protected class GradientCalculation + extends RichMapFunction[LabeledVector, (WeightVector, Double, Int)] { var weightVector: WeightVector = null diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala index 52180313fe8f6..bae0288bf6466 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala @@ -119,7 +119,6 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { val weights = weightVector.weights.asInstanceOf[DenseVector].data val weight0 = weightVector.intercept - expectedWeights zip weights foreach { case (expectedWeight, weight) => weight should be (expectedWeight +- 0.1) @@ -226,7 +225,6 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { .setRegularizationType(NoRegularization()) .setRegularizationParameter(0.0) - val weightDSNoConvergence = sgdNoConvergence.optimize(inputDS, None) val weightListNoConvergence: Seq[WeightVector] = weightDSNoConvergence.collect() From 6b6b506ca114d0c0b417cb9e03770b4c9e8cd2c0 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Thu, 21 May 2015 14:01:53 +0200 Subject: [PATCH 09/10] Style fixes --- .../ml/optimization/GradientDescent.scala | 23 +++++++-------- .../ml/optimization/Regularization.scala | 14 ++++++++- .../apache/flink/ml/optimization/Solver.scala | 29 ++++++++++--------- .../ml/optimization/LossFunctionITSuite.scala | 6 +++- .../optimization/RegularizationITSuite.scala | 12 +++++--- 5 files changed, 51 insertions(+), 33 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index bc7852a7f078d..ef171f538d493 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -37,9 +37,9 @@ import org.apache.flink.ml.optimization.Solver._ * descent. Once a sampling operator has been introduced, the algorithm can be optimized * * The parameters to tune the algorithm are: - * [[Solver.LossFunctionParameter]] for the loss function to be used, - * [[Solver.RegularizationTypeParameter]] for the type of regularization, - * [[Solver.RegularizationValueParameter]] for the regularization parameter, + * [[Solver.LossFunction]] for the loss function to be used, + * [[Solver.RegularizationType]] for the type of regularization, + * [[Solver.RegularizationParameter]] for the regularization parameter, * [[IterativeSolver.Iterations]] for the maximum number of iteration, * [[IterativeSolver.Stepsize]] for the learning rate used. * [[IterativeSolver.ConvergenceThreshold]] when provided the algorithm will @@ -104,7 +104,7 @@ class GradientDescent() extends IterativeSolver { } } case Some(convergence) => - /** Calculates the regularized loss, from the data and given weights **/ + // Calculates the regularized loss, from the data and given weights def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): DataSet[Double] = { data @@ -169,20 +169,17 @@ class GradientDescent() extends IterativeSolver { var weightVector: WeightVector = null - @throws(classOf[Exception]) override def open(configuration: Configuration): Unit = { val list = this.getRuntimeContext. getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST) weightVector = list.get(0) - } override def map(example: LabeledVector): (Double, Int) = { - val lossFunction = parameters(LossFunctionParameter) - val predictionFunction = parameters(PredictionFunctionParameter) - val dimensions = example.vector.size + val lossFunction = parameters(LossFunction) + val predictionFunction = parameters(PredictionFunction) val loss = lossFunction.lossValue( example, @@ -211,8 +208,8 @@ private class RegularizedLossCalculation extends RichMapFunction[(Double, Int), override def map(lossAndCount: (Double, Int)): Double = { val (lossSum, count) = lossAndCount - val regType = parameters(RegularizationTypeParameter) - val regParameter = parameters(RegularizationValueParameter) + val regType = parameters(RegularizationType) + val regParameter = parameters(RegularizationParameter) val regularizedLoss = { regType.regLoss( @@ -241,8 +238,8 @@ private class RegularizedLossCalculation extends RichMapFunction[(Double, Int), } override def map(gradientLossAndCount: (WeightVector, Double, Int)): WeightVector = { - val regType = parameters(RegularizationTypeParameter) - val regParameter = parameters(RegularizationValueParameter) + val regType = parameters(RegularizationType) + val regParameter = parameters(RegularizationParameter) val stepsize = parameters(Stepsize) val weightGradients = gradientLossAndCount._1 val lossSum = gradientLossAndCount._2 diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala index 8b2e5896ebf10..e2bc5f41ef0e9 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala @@ -97,7 +97,7 @@ abstract class DiffRegularization extends Regularization { // TODO(tvas): I think NoRegularization should extend DiffRegularization /** Performs no regularization, equivalent to $R(w) = 0$ **/ -class NoRegularization extends Regularization { +class NoRegularization extends DiffRegularization { /** Adds the regularization term to the loss value * * @param loss The loss value, before applying regularization @@ -109,6 +109,18 @@ class NoRegularization extends Regularization { loss: Double, weightVector: FlinkVector, regParameter: Double): Double = {loss} + + /** Adds the regularization gradient term to the loss gradient. The gradient is updated in place. + * + * Since we don't apply any regularization, the gradient will stay the same. + * @param weightVector The current vector of weights + * @param lossGradient The loss gradient, without regularization. Updated in-place. + * @param regParameter The regularization parameter, $\lambda$. + */ + override def regGradient( + weightVector: FlinkVector, + lossGradient: FlinkVector, + regParameter: Double) = {} } object NoRegularization { diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala index b9ac1c2885186..f2cbce34c4898 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala @@ -58,10 +58,11 @@ abstract class Solver() extends Serializable with WithParameters { initialWeights match { // Ensure provided weight vector is a DenseVector case Some(wvDS) => - wvDS.map { wv => { - val denseWeights = wv.weights match { - case dv: DenseVector => dv - case sv: SparseVector => sv.toDenseVector + wvDS.map { + wv => { + val denseWeights = wv.weights match { + case dv: DenseVector => dv + case sv: SparseVector => sv.toDenseVector } WeightVector(denseWeights, wv.intercept) } @@ -88,24 +89,24 @@ abstract class Solver() extends Serializable with WithParameters { //Setters for parameters // TODO(tvas): Provide an option to fit an intercept or not def setLossFunction(lossFunction: LossFunction): this.type = { - parameters.add(LossFunctionParameter, lossFunction) + parameters.add(LossFunction, lossFunction) this } // TODO(tvas): Sanitize the input, i.e. depending on Solver type allow only certain types of // regularization to be set. def setRegularizationType(regularization: Regularization): this.type = { - parameters.add(RegularizationTypeParameter, regularization) + parameters.add(RegularizationType, regularization) this } def setRegularizationParameter(regularizationParameter: Double): this.type = { - parameters.add(RegularizationValueParameter, regularizationParameter) + parameters.add(RegularizationParameter, regularizationParameter) this } def setPredictionFunction(predictionFunction: PredictionFunction): this.type = { - parameters.add(PredictionFunctionParameter, predictionFunction) + parameters.add(PredictionFunction, predictionFunction) this } } @@ -115,21 +116,21 @@ object Solver { val WEIGHTVECTOR_BROADCAST = "weights_broadcast" // Define parameters for Solver - case object LossFunctionParameter extends Parameter[LossFunction] { + case object LossFunction extends Parameter[LossFunction] { // TODO(tvas): Should depend on problem, here is where differentiating between classification // and regression could become useful val defaultValue = Some(new SquaredLoss) } - case object RegularizationTypeParameter extends Parameter[Regularization] { + case object RegularizationType extends Parameter[Regularization] { val defaultValue = Some(new NoRegularization) } - case object RegularizationValueParameter extends Parameter[Double] { + case object RegularizationParameter extends Parameter[Double] { val defaultValue = Some(0.0) // TODO(tvas): Properly initialize this, ensure Parameter > 0! } - case object PredictionFunctionParameter extends Parameter[PredictionFunction] { + case object PredictionFunction extends Parameter[PredictionFunction] { val defaultValue = Some(new LinearPrediction) } } @@ -175,8 +176,8 @@ abstract class IterativeSolver() extends Solver { override def map(example: LabeledVector): (WeightVector, Double, Int) = { - val lossFunction = parameters(LossFunctionParameter) - val predictionFunction = parameters(PredictionFunctionParameter) + val lossFunction = parameters(LossFunction) + val predictionFunction = parameters(PredictionFunction) val dimensions = example.vector.size val weightGradient = new DenseVector(new Array[Double](dimensions)) diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala index 7b102e0559adf..a0921e5ea3250 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala @@ -30,7 +30,7 @@ class LossFunctionITSuite extends FlatSpec with Matchers with FlinkTestBase { behavior of "The optimization Loss Function implementations" - it should "calculate squared loss correctly" in { + it should "calculate squared loss and gradient correctly" in { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) @@ -47,8 +47,12 @@ class LossFunctionITSuite extends FlatSpec with Matchers with FlinkTestBase { gradient, new LinearPrediction) + val onlyLoss = squaredLoss.lossValue(example, weightVector, new LinearPrediction) + loss should be (2.0 +- 0.001) + onlyLoss should be (2.0 +- 0.001) + lossDerivative should be (2.0 +- 0.001) gradient.data(0) should be (4.0 +- 0.001) diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala index ad3ea8940297d..89c77f2fb1f24 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala @@ -32,7 +32,7 @@ class RegularizationITSuite extends FlatSpec with Matchers with FlinkTestBase { behavior of "The regularization type implementations" - it should "not change the loss when no regularization is used" in { + it should "not change the loss or gradient when no regularization is used" in { val env = ExecutionEnvironment.getExecutionEnvironment @@ -41,14 +41,18 @@ class RegularizationITSuite extends FlatSpec with Matchers with FlinkTestBase { val regularization = new NoRegularization val weightVector = new WeightVector(DenseVector(1.0), 1.0) - val effectiveStepsize = 1.0 - val regParameter = 0.0 + val regParameter = 1.0 val gradient = DenseVector(0.0) val originalLoss = 1.0 - val adjustedLoss = regularization.regLoss(originalLoss, weightVector.weights, regParameter) + val adjustedLoss = regularization.regularizedLossAndGradient( + originalLoss, + weightVector.weights, + gradient, + regParameter) adjustedLoss should be (originalLoss +- 0.0001) + gradient shouldEqual DenseVector(0.0) } it should "correctly apply L1 regularization" in { From 8d7f783a59dece0f17e2d53edce87921974e1f20 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Thu, 21 May 2015 14:13:43 +0200 Subject: [PATCH 10/10] Hotfix --- .../scala/org/apache/flink/ml/optimization/Regularization.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala index e2bc5f41ef0e9..9e6df4a2a43cf 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala @@ -95,7 +95,6 @@ abstract class DiffRegularization extends Regularization { regParameter: Double) } -// TODO(tvas): I think NoRegularization should extend DiffRegularization /** Performs no regularization, equivalent to $R(w) = 0$ **/ class NoRegularization extends DiffRegularization { /** Adds the regularization term to the loss value