From 8576054f7f34361f96a0e815c2cb4f16ab9b688b Mon Sep 17 00:00:00 2001 From: Trevor Grant Date: Fri, 8 Apr 2016 10:06:28 -0500 Subject: [PATCH 1/3] [FLINK][3720] Added Warm Starts [FLINK][3720] Added Tests and Documentation --- docs/apis/batch/libs/ml/optimization.md | 35 ++++++++++++++++-- .../ml/optimization/GradientDescent.scala | 32 +++++++++++----- .../apache/flink/ml/optimization/Solver.scala | 13 +++++++ .../regression/MultipleLinearRegression.scala | 16 ++++++-- .../optimization/GradientDescentITSuite.scala | 37 +++++++++++++++++++ 5 files changed, 118 insertions(+), 15 deletions(-) diff --git a/docs/apis/batch/libs/ml/optimization.md b/docs/apis/batch/libs/ml/optimization.md index ccb7e451cdcf0..8b07867e0b944 100644 --- a/docs/apis/batch/libs/ml/optimization.md +++ b/docs/apis/batch/libs/ml/optimization.md @@ -83,6 +83,12 @@ and is usually determined through model cross-validation. A good comparison of regularization types can be found in [this](http://www.robotics.stanford.edu/~ang/papers/icml04-l1l2.pdf) paper by Andrew Ng. Which regularization type is supported depends on the actually used optimization algorithm. +### Warm Starts + +For any optimization, the parameter weights must be initialized to some value. In Flink's implementation of Stochastic Gradient Descent for example, the weights are initialized as a zero vector. By default, every time a model is fit the weights are reinitialized to zero. When warm starts are allowed however, successive fits initialize the weight vector with the solution of the previous fit, and the iteration number $j$ is maintained, which may be important for calculating effective learning rate (depending on the strategy, see below). + +Suppose for example, you have fit a particularly complex model over 100,000 iterations but have not reached convergence. By allowing warm starts and fitting for an additional 10,000 iterations, you may be able to achieve convergence with out having to start the fit process over again. Another example, suppose you wish to measure the convergence rate among different step-size calculation strategies for a particular problem. By setting a lower iteration number, allowing warm starts, fitting the algorithm multiple times, and measuring the error after each fit this may be achieved (see example). + ## Stochastic Gradient Descent In order to find a (local) minimum of a function, Gradient Descent methods take steps in the @@ -310,7 +316,7 @@ Where: Constant -

+

The step size is constant throughout the learning task.

@@ -321,10 +327,10 @@ Where: Leon Bottou's Method

- This is the 'optimal' method of sklearn. + This is the 'optimal' method of sklearn. The optimal initial value $t_0$ has to be provided. Sklearn uses the following heuristic: $t_0 = \max(1.0, L^\prime(-\beta, 1.0) / (\alpha \cdot \beta)$ - with $\beta = \sqrt{\frac{1}{\sqrt{\alpha}}}$ and $L^\prime(prediction, truth)$ being the derivative of the loss function. + with $\beta = \sqrt{\frac{1}{\sqrt{\alpha}}}$ and $L^\prime(prediction, truth)$ being the derivative of the loss function.

$\eta_j = 1 / (\lambda \cdot (t_0 + j -1)) $ @@ -383,3 +389,26 @@ val trainingDS: DataSet[LabeledVector] = ... // Optimize the weights, according to the provided data val weightDS = sgd.optimize(trainingDS) {% endhighlight %} + + +// Use Warm Starts to illustrate convergence +val mlr_default = MultipleLinearRegression() +.setIterations(10) +.setConvergenceThreshold(0.001) +.setWarmStart(true) + + +val mlr_xu = MultipleLinearRegression() +.setIterations(10) +.setConvergenceThreshold(0.001) +.setWarmStart(true) +.setLearningRateMethod(LearningRateMethod.Xu(-0.75)) + + +for (i <- 1 to 10){ + mlr_default.fit(trainingDS) + mlr_xu.fit(trainingDS) + val resid_default = mlr_default.squaredResidualSum(trainingDS).collect() + val resid_xu = mlr_xu.squaredResidualSum(trainingDS).collect() + println(s"Iteration: ${(10*i).toString}: Default SSR: ${resid_default.toString} Xu SSR: ${resid_xu.toString}") +} diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index 407c074b7f12c..3d70d4ea2b04e 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -55,6 +55,7 @@ abstract class GradientDescent extends IterativeSolver { * @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ + override def optimize( data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { @@ -65,9 +66,14 @@ abstract class GradientDescent extends IterativeSolver { val learningRate = parameters(LearningRate) val regularizationConstant = parameters(RegularizationConstant) val learningRateMethod = parameters(LearningRateMethodValue) + val warmStart = parameters(WarmStart) // Initialize weights val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + val startingIter = warmStart match { + case true => CURRENT_ITER + case false => 0 + } // Perform the iterations convergenceThresholdOption match { // No convergence criterion @@ -79,7 +85,8 @@ abstract class GradientDescent extends IterativeSolver { regularizationConstant, learningRate, lossFunction, - learningRateMethod) + learningRateMethod, + startingIter) case Some(convergence) => optimizeWithConvergenceCriterion( data, @@ -89,7 +96,8 @@ abstract class GradientDescent extends IterativeSolver { learningRate, convergence, lossFunction, - learningRateMethod) + learningRateMethod, + startingIter) } } @@ -101,7 +109,8 @@ abstract class GradientDescent extends IterativeSolver { learningRate: Double, convergenceThreshold: Double, lossFunction: LossFunction, - learningRateMethod: LearningRateMethodTrait) + learningRateMethod: LearningRateMethodTrait, + startingIter: Int) : DataSet[WeightVector] = { // We have to calculate for each weight vector the sum of squared residuals, // and then sum them and apply regularization @@ -114,7 +123,7 @@ abstract class GradientDescent extends IterativeSolver { val resultWithLoss = initialWeightsWithLossSum.iterateWithTermination(numberOfIterations) { weightsWithPreviousLossSum => - + CURRENT_ITER = CURRENT_ITER + numberOfIterations // Extract weight vector and loss val previousWeightsDS = weightsWithPreviousLossSum.map{_._1} val previousLossSumDS = weightsWithPreviousLossSum.map{_._2} @@ -125,7 +134,8 @@ abstract class GradientDescent extends IterativeSolver { lossFunction, regularizationConstant, learningRate, - learningRateMethod) + learningRateMethod, + startingIter) val currentLossSumDS = calculateLoss(dataPoints, currentWeightsDS, lossFunction) @@ -155,8 +165,10 @@ abstract class GradientDescent extends IterativeSolver { regularizationConstant: Double, learningRate: Double, lossFunction: LossFunction, - optimizationMethod: LearningRateMethodTrait) + optimizationMethod: LearningRateMethodTrait, + startingIter: Int) : DataSet[WeightVector] = { + CURRENT_ITER = CURRENT_ITER + numberOfIterations initialWeightsDS.iterate(numberOfIterations) { weightVectorDS => { SGDStep(data, @@ -164,7 +176,8 @@ abstract class GradientDescent extends IterativeSolver { lossFunction, regularizationConstant, learningRate, - optimizationMethod) + optimizationMethod, + startingIter) } } } @@ -181,7 +194,8 @@ abstract class GradientDescent extends IterativeSolver { lossFunction: LossFunction, regularizationConstant: Double, learningRate: Double, - learningRateMethod: LearningRateMethodTrait) + learningRateMethod: LearningRateMethodTrait, + startingIter: Int) : DataSet[WeightVector] = { data.mapWithBcVariable(currentWeights){ @@ -214,7 +228,7 @@ abstract class GradientDescent extends IterativeSolver { val gradient = WeightVector(weights, intercept/count) val effectiveLearningRate = learningRateMethod.calculateLearningRate( learningRate, - iteration, + iteration + startingIter, regularizationConstant) val newWeights = takeStep( diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala index 3bad03857eeb1..6e514d458d3a3 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala @@ -137,12 +137,20 @@ abstract class IterativeSolver() extends Solver { parameters.add(LearningRateMethodValue, learningRateMethod) this } + + def setWarmStart(warmStart: Boolean): this.type = { + parameters.add(WarmStart, warmStart) + this + } + } object IterativeSolver { val MAX_DLOSS: Double = 1e12 + var CURRENT_ITER: Int = 0 + // Define parameters for IterativeSolver case object LearningRate extends Parameter[Double] { val defaultValue = Some(0.1) @@ -159,6 +167,11 @@ object IterativeSolver { case object LearningRateMethodValue extends Parameter[LearningRateMethodTrait] { val defaultValue = Some(LearningRateMethod.Default) } + + case object WarmStart extends Parameter[Boolean] { + val defaultValue = Some(false) + } + } object LearningRateMethod { diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala index ef06033599c09..44d1024bfafc9 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala @@ -96,7 +96,7 @@ class MultipleLinearRegression extends Predictor[MultipleLinearRegression] { // Stores the weights of the linear model after the fitting phase var weightsOption: Option[DataSet[WeightVector]] = None - + def setIterations(iterations: Int): MultipleLinearRegression = { parameters.add(Iterations, iterations) this @@ -119,6 +119,11 @@ class MultipleLinearRegression extends Predictor[MultipleLinearRegression] { this } + def setWarmStart(warmStart: Boolean): MultipleLinearRegression = { + parameters.add(WarmStart, warmStart) + this + } + def squaredResidualSum(input: DataSet[LabeledVector]): DataSet[Double] = { weightsOption match { case Some(weights) => { @@ -162,6 +167,10 @@ object MultipleLinearRegression { val defaultValue = None } + case object WarmStart extends Parameter[Boolean] { + val defaultValue = Some(false) + } + // ======================================== Factory methods ====================================== def apply(): MultipleLinearRegression = { @@ -187,13 +196,14 @@ object MultipleLinearRegression { val stepsize = map(Stepsize) val convergenceThreshold = map.get(ConvergenceThreshold) val learningRateMethod = map.get(LearningRateMethodValue) - + val warmStart = map.get(WarmStart).get val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) val optimizer = SimpleGradientDescent() .setIterations(numberOfIterations) .setStepsize(stepsize) .setLossFunction(lossFunction) + .setWarmStart(warmStart) convergenceThreshold match { case Some(threshold) => optimizer.setConvergenceThreshold(threshold) @@ -205,7 +215,7 @@ object MultipleLinearRegression { case None => } - instance.weightsOption = Some(optimizer.optimize(input, None)) + instance.weightsOption = Some(optimizer.optimize(input, instance.weightsOption)) } } diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala index aed3bfd062666..3ecea7e247387 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala @@ -272,5 +272,42 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { weight0 should be (expectedWeight0 +- 0.1) } // TODO: Need more corner cases, see sklearn tests for SGD linear model + it should "estimate a linear function without an intercept in partial fits" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) + + val sgd = SimpleGradientDescent() + .setStepsize(0.001) + .setIterations(20) + .setLossFunction(lossFunction) + .setWarmStart(true) + val inputDS = env.fromCollection(noInterceptData) + + + var weightsOption: Option[DataSet[WeightVector]] = None + for (i <- 1 to 10) { // will perform 10*20=200 iterations + weightsOption = Some(sgd.optimize(inputDS, weightsOption)) + } + + var weightDS = weightsOption.get + + val weightList: Seq[WeightVector] = weightDS.collect() + + weightList.size should equal(1) + + val weightVector: WeightVector = weightList.head + + val weights = weightVector.weights.asInstanceOf[DenseVector].data + val weight0 = weightVector.intercept + + expectedNoInterceptWeights zip weights foreach { + case (expectedWeight, weight) => + weight should be (expectedWeight +- 0.1) + } + weight0 should be (expectedNoInterceptWeight0 +- 0.1) + } } From 409bafe493c5b0210f1f3496be0134df1da1782e Mon Sep 17 00:00:00 2001 From: Trevor Grant Date: Wed, 13 Apr 2016 07:11:00 -0500 Subject: [PATCH 2/3] [FLINK][3720][ml] Fixed Documentation --- docs/apis/batch/libs/ml/optimization.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/apis/batch/libs/ml/optimization.md b/docs/apis/batch/libs/ml/optimization.md index 8b07867e0b944..28360b802f838 100644 --- a/docs/apis/batch/libs/ml/optimization.md +++ b/docs/apis/batch/libs/ml/optimization.md @@ -388,8 +388,6 @@ val trainingDS: DataSet[LabeledVector] = ... // Optimize the weights, according to the provided data val weightDS = sgd.optimize(trainingDS) -{% endhighlight %} - // Use Warm Starts to illustrate convergence val mlr_default = MultipleLinearRegression() @@ -412,3 +410,4 @@ for (i <- 1 to 10){ val resid_xu = mlr_xu.squaredResidualSum(trainingDS).collect() println(s"Iteration: ${(10*i).toString}: Default SSR: ${resid_default.toString} Xu SSR: ${resid_xu.toString}") } +{% endhighlight %} From 7cd51fe8aae79f09a6924ea514aab19d79d561c5 Mon Sep 17 00:00:00 2001 From: Trevor Grant Date: Thu, 5 May 2016 09:39:56 -0500 Subject: [PATCH 3/3] [FLINK][3720][ml] Add persistent optimizer to MLR --- .../regression/MultipleLinearRegression.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala index 44d1024bfafc9..2cefc9ced95fd 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala @@ -124,6 +124,11 @@ class MultipleLinearRegression extends Predictor[MultipleLinearRegression] { this } + def setOptimizer(optimizer: IterativeSolver): MultipleLinearRegression = { + parameters.add(Optimizer, optimizer) + this + } + def squaredResidualSum(input: DataSet[LabeledVector]): DataSet[Double] = { weightsOption match { case Some(weights) => { @@ -171,6 +176,9 @@ object MultipleLinearRegression { val defaultValue = Some(false) } + case object Optimizer extends Parameter[IterativeSolver] { + val defaultValue = None + } // ======================================== Factory methods ====================================== def apply(): MultipleLinearRegression = { @@ -199,11 +207,14 @@ object MultipleLinearRegression { val warmStart = map.get(WarmStart).get val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) - val optimizer = SimpleGradientDescent() - .setIterations(numberOfIterations) - .setStepsize(stepsize) - .setLossFunction(lossFunction) - .setWarmStart(warmStart) + val optimizer = map.get(Optimizer) match { + case None => SimpleGradientDescent() + .setIterations(numberOfIterations) + .setStepsize(stepsize) + .setLossFunction(lossFunction) + .setWarmStart(warmStart) + case Some(opt) => opt + } convergenceThreshold match { case Some(threshold) => optimizer.setConvergenceThreshold(threshold)