From bbf452e7cac7fcafcc494c79062d5139fe49117c Mon Sep 17 00:00:00 2001 From: rawkintrevo Date: Mon, 5 Oct 2015 11:27:58 -0500 Subject: [PATCH] FLINK-1994: Added 3 new effective learning rates --- flink-staging/flink-hbase/pom.xml | 12 ++++++ .../ml/optimization/GradientDescent.scala | 43 ++++++++++++++----- .../apache/flink/ml/optimization/Solver.scala | 9 ++++ .../optimization/GradientDescentITSuite.scala | 1 + 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/flink-staging/flink-hbase/pom.xml b/flink-staging/flink-hbase/pom.xml index 9638c58e91c67..5dc80d92e3c3b 100644 --- a/flink-staging/flink-hbase/pom.xml +++ b/flink-staging/flink-hbase/pom.xml @@ -171,6 +171,18 @@ under the License. hadoop-2 + + + hadoop-2-repo2 + https://repo.maven.apache.org/maven2 + + true + + + false + + + diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index 78bad708ef3f0..3bcee7899b2f7 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -22,7 +22,7 @@ package org.apache.flink.ml.optimization import org.apache.flink.api.scala._ import org.apache.flink.ml.common._ import org.apache.flink.ml.math._ -import org.apache.flink.ml.optimization.IterativeSolver.{ConvergenceThreshold, Iterations, LearningRate} +import org.apache.flink.ml.optimization.IterativeSolver.{ConvergenceThreshold, Iterations, LearningRate, OptimizationMethod} import org.apache.flink.ml.optimization.Solver._ import org.apache.flink.ml._ @@ -43,6 +43,13 @@ import org.apache.flink.ml._ * [[IterativeSolver.ConvergenceThreshold]] when provided the algorithm will * stop the iterations if the relative change in the value of the objective * function between successive iterations is is smaller than this value. + * [[OtimizationMethod]] for the optimization method to be used. Available options are: + * The original optimization method: + * 0: "default" => learningRate/Math.sqrt(iteration) + * New methods based on sklearn naming conventions + * 1: "constant" => learningRate + * 2: "optimal" => 1 / (regularizationConstant * iteration) + * 3: "inverseScaling" => learningRate / Math.pow(iteration, 0.5) */ abstract class GradientDescent extends IterativeSolver { @@ -54,14 +61,15 @@ abstract class GradientDescent extends IterativeSolver { */ override def optimize( data: DataSet[LabeledVector], - initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { + initialWeights: Option[DataSet[WeightVector]] + ): DataSet[WeightVector] = { val numberOfIterations: Int = parameters(Iterations) val convergenceThresholdOption: Option[Double] = parameters.get(ConvergenceThreshold) val lossFunction = parameters(LossFunction) val learningRate = parameters(LearningRate) val regularizationConstant = parameters(RegularizationConstant) - + val optimizationMethod = parameters(OptimizationMethod) // Initialize weights val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) @@ -75,7 +83,8 @@ abstract class GradientDescent extends IterativeSolver { numberOfIterations, regularizationConstant, learningRate, - lossFunction) + lossFunction, + optimizationMethod) case Some(convergence) => optimizeWithConvergenceCriterion( data, @@ -84,7 +93,8 @@ abstract class GradientDescent extends IterativeSolver { regularizationConstant, learningRate, convergence, - lossFunction + lossFunction, + optimizationMethod ) } } @@ -96,7 +106,8 @@ abstract class GradientDescent extends IterativeSolver { regularizationConstant: Double, learningRate: Double, convergenceThreshold: Double, - lossFunction: LossFunction) + lossFunction: LossFunction, + optimizationMethod: Int) : DataSet[WeightVector] = { // We have to calculate for each weight vector the sum of squared residuals, // and then sum them and apply regularization @@ -119,7 +130,8 @@ abstract class GradientDescent extends IterativeSolver { previousWeightsDS, lossFunction, regularizationConstant, - learningRate) + learningRate, + optimizationMethod) val currentLossSumDS = calculateLoss(dataPoints, currentWeightsDS, lossFunction) @@ -148,11 +160,12 @@ abstract class GradientDescent extends IterativeSolver { numberOfIterations: Int, regularizationConstant: Double, learningRate: Double, - lossFunction: LossFunction) + lossFunction: LossFunction, + optimizationMethod: Int) : DataSet[WeightVector] = { initialWeightsDS.iterate(numberOfIterations) { weightVectorDS => { - SGDStep(data, weightVectorDS, lossFunction, regularizationConstant, learningRate) + SGDStep(data, weightVectorDS, lossFunction, regularizationConstant, learningRate, optimizationMethod) } } } @@ -168,7 +181,8 @@ abstract class GradientDescent extends IterativeSolver { currentWeights: DataSet[WeightVector], lossFunction: LossFunction, regularizationConstant: Double, - learningRate: Double) + learningRate: Double, + optimizationMethod: Int) : DataSet[WeightVector] = { data.mapWithBcVariable(currentWeights){ @@ -190,8 +204,15 @@ abstract class GradientDescent extends IterativeSolver { BLAS.scal(1.0/count, weights) val gradient = WeightVector(weights, intercept/count) + val effectiveLearningRate = optimizationMethod match { + // original effective learning rate method for backward compatability + case 0 => learningRate/Math.sqrt(iteration) + // These come straight from sklearn + case 1 => learningRate + case 2 => 1 / (regularizationConstant * iteration) + case 3 => learningRate / Math.pow(iteration, 0.5) + } - val effectiveLearningRate = learningRate/Math.sqrt(iteration) val newWeights = takeStep( weightVector.weights, diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala index 39a031f8a5806..957087fd9812b 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala @@ -131,6 +131,11 @@ abstract class IterativeSolver() extends Solver { parameters.add(ConvergenceThreshold, convergenceThreshold) this } + + def setOptimizationMethod(optimizationMethod: Int): this.type = { + parameters.add(OptimizationMethod, optimizationMethod) + this + } } object IterativeSolver { @@ -149,4 +154,8 @@ object IterativeSolver { case object ConvergenceThreshold extends Parameter[Double] { val defaultValue = None } + + case object OptimizationMethod extends Parameter[Int] { + val defaultValue = Some(0) + } } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala index d84d017a720a2..52cb89a659f66 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala @@ -45,6 +45,7 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { .setIterations(2000) .setLossFunction(lossFunction) .setRegularizationConstant(0.3) + .setOptimizationMethod(0) val inputDS: DataSet[LabeledVector] = env.fromCollection(regularizationData)