From af72ca398d1cd865f07be7de10ecea6b426d8187 Mon Sep 17 00:00:00 2001 From: Peter Bailis Date: Wed, 9 Jul 2014 14:22:04 -0700 Subject: [PATCH 1/7] Initial BSP ADMM via Pegasos SGD --- .../spark/mllib/admm/ADMMLocalSolver.scala | 47 ++++++++++++++++++ .../org/apache/spark/mllib/admm/BSPADMM.scala | 48 +++++++++++++++++++ .../apache/spark/mllib/admm/BVGradient.scala | 19 ++++++++ .../spark/mllib/admm/LocalPegasosSGD.scala | 32 +++++++++++++ .../org/apache/spark/mllib/admm/package.scala | 7 +++ 5 files changed, 153 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/admm/BVGradient.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/admm/LocalPegasosSGD.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/admm/package.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala new file mode 100644 index 0000000000000..679e6a6702244 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala @@ -0,0 +1,47 @@ +package org.apache.spark.mllib.admm + +import scala.util.Random +import breeze.linalg.axpy + +abstract class ADMMLocalSolver(val points: Array[BV], val labels: Array[Double]) { + var lagrangian: BV = points(0)*0 + var w_prev: BV = lagrangian.copy + + def solveLocal(theta_avg: BV, rho: Double, epsilon: Double = 0.01, initial_w: BV = null): BV +} + +class ADMMSGDLocalSolver(points: Array[BV], + labels: Array[Double], + val gradient: BVGradient, + val eta_0: Double, + val maxIterations: Int = Integer.MAX_VALUE) extends ADMMLocalSolver(points, labels) { + def solveLocal(w_avg: BV, rho: Double, epsilon: Double = 0.01, initial_w: BV = null): BV = { + lagrangian += (w_avg - w_prev) * rho + + var i = 0 + var residual = Double.MaxValue + + var w = if(initial_w != null) { + initial_w.copy + } else { + w_avg.copy + } + + // TODO: fix residual + while(i < maxIterations && residual > epsilon) { + val pointIndex = Random.nextInt(points.length) + val x = points(pointIndex) + val y = labels(pointIndex) + + val point_gradient = gradient(x, y, w) + + point_gradient += lagrangian + (w - w_avg)*rho + + val eta_t = eta_0/(i+1) + axpy(eta_t, point_gradient, w) + } + + w_prev = w + w + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala new file mode 100644 index 0000000000000..bda1c7980566f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala @@ -0,0 +1,48 @@ +package org.apache.spark.mllib.admm + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.regression.LabeledPoint + +case class SubProblem(val points: Array[BV], val labels: Array[Double]) + +class BSPADMMwithSGD { + def train(input: RDD[LabeledPoint], + numADMMIterations: Int, + rho: Double, + gradient: BVGradient) = { + val subProblems: RDD[SubProblem] = input.mapPartitions{ iter => + val localData = iter.toArray + val points = localData.map { pt => pt.features.toBreeze } + val labels = localData.map { pt => pt.label } + Iterator(SubProblem(points, labels)) + } + + val solvers = subProblems.map { s => + val regularizer = 0.1 + val gradient = new PegasosBVGradient(regularizer) + new ADMMSGDLocalSolver(s.points, s.labels, gradient, eta_0 = regularizer, maxIterations = 5) + }.cache() + + val numSolvers = solvers.partitions.length + + var primalResidual = Double.MaxValue + var dualResidual = Double.MaxValue + val epsilon = 0.01 + var iter = 0 + + var w_avg: BV = input.take(1)(0).features.toBreeze*0 + + while (iter < numADMMIterations && primalResidual < epsilon && dualResidual < epsilon) { + val local_w = solvers.map { s => s.solveLocal(w_avg, rho, epsilon) } . collect() + val new_w_avg: BV = local_w.sum / numSolvers + + primalResidual = Math.pow(local_w.foldLeft(0.0){ (sum , w) => (w - new_w_avg).norm() }, 2) + dualResidual = rho * Math.pow((new_w_avg - w_avg).norm(), 2) + + w_avg = new_w_avg + iter += 1 + } + + w_avg + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/BVGradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/BVGradient.scala new file mode 100644 index 0000000000000..b7a0f0bcaa8c5 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/BVGradient.scala @@ -0,0 +1,19 @@ +package org.apache.spark.mllib.admm + + +abstract class BVGradient { + def apply(data: BV, label: Double, weights: BV): BV +} + +class PegasosBVGradient(val lambda: Double) extends BVGradient { + def apply(data: BV, label: Double, weights: BV): BV = { + val prod = label * data.dot(weights) + + val ret: BV = weights * label + if(prod < 1) { + ret -= data * label + } + + ret + } +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/LocalPegasosSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/LocalPegasosSGD.scala new file mode 100644 index 0000000000000..7824efad4213f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/LocalPegasosSGD.scala @@ -0,0 +1,32 @@ +package org.apache.spark.mllib.admm + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.optimization.Gradient +import org.apache.spark.mllib.linalg.Vector + +import breeze.linalg.{axpy => brzAxpy} + + +object LocalPegasosSGD { + def solve(input: RDD[LabeledPoint], + lambda: Double, + dim: Int, + iterations: Int, + initial_t: Int, + w: Vector, + grad: Gradient) { + for (t <- initial_t until initial_t+iterations) { + val point = input.takeSample(false, 1).apply(0) + val x = point.features + val y = point.label + + val (point_gradient, loss) = grad.compute(x, y, w) + + val eta = 1./(lambda * (t+1)) + brzAxpy(eta, point_gradient.toBreeze, w.toBreeze) + } + w + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/package.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/package.scala new file mode 100644 index 0000000000000..583ac97483bcb --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/package.scala @@ -0,0 +1,7 @@ +package org.apache.spark.mllib + +import breeze.linalg.Vector + +package object admm { + type BV = Vector[Double] +} From d771bec3788ed421b4d387b631ca1d07ce2315ad Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 9 Jul 2014 19:46:21 -0700 Subject: [PATCH 2/7] Adding test runner. --- .../examples/mllib/BinaryClassification.scala | 6 ++- .../spark/mllib/admm/ADMMLocalSolver.scala | 4 +- .../org/apache/spark/mllib/admm/BSPADMM.scala | 20 +++++--- .../spark/mllib/admm/LocalPegasosSGD.scala | 32 ------------ .../apache/spark/mllib/admm/PegasosSVM.scala | 51 +++++++++++++++++++ .../GeneralizedLinearAlgorithm.scala | 2 +- 6 files changed, 73 insertions(+), 42 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/admm/LocalPegasosSGD.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala index 56b02b65d8724..496dd4b7df474 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala @@ -18,6 +18,7 @@ package org.apache.spark.examples.mllib import org.apache.log4j.{Level, Logger} +import org.apache.spark.mllib.admm.PegasosSVM import scopt.OptionParser import org.apache.spark.{SparkConf, SparkContext} @@ -38,7 +39,7 @@ object BinaryClassification { object Algorithm extends Enumeration { type Algorithm = Value - val SVM, LR = Value + val SVM, LR, Pegasos = Value } object RegType extends Enumeration { @@ -140,6 +141,9 @@ object BinaryClassification { .setUpdater(updater) .setRegParam(params.regParam) algorithm.run(training).clearThreshold() + case Pegasos => + val algorithm = new PegasosSVM() + algorithm.run(training) } val prediction = model.predict(test.map(_.features)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala index 679e6a6702244..a94dbdfd6a7df 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala @@ -2,9 +2,11 @@ package org.apache.spark.mllib.admm import scala.util.Random import breeze.linalg.axpy +import breeze.util.Implicits._ + abstract class ADMMLocalSolver(val points: Array[BV], val labels: Array[Double]) { - var lagrangian: BV = points(0)*0 + var lagrangian: BV = points(0) * 0.0 var w_prev: BV = lagrangian.copy def solveLocal(theta_avg: BV, rho: Double, epsilon: Double = 0.01, initial_w: BV = null): BV diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala index bda1c7980566f..912802c640383 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala @@ -2,18 +2,23 @@ package org.apache.spark.mllib.admm import org.apache.spark.rdd.RDD import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.linalg.Vector -case class SubProblem(val points: Array[BV], val labels: Array[Double]) +import breeze.util.Implicits._ -class BSPADMMwithSGD { - def train(input: RDD[LabeledPoint], + +case class SubProblem(points: Array[BV], labels: Array[Double]) + +object BSPADMMwithSGD { + def train(input: RDD[(Double, Vector)], numADMMIterations: Int, rho: Double, - gradient: BVGradient) = { + gradient: BVGradient, + initialWeights: Vector) = { val subProblems: RDD[SubProblem] = input.mapPartitions{ iter => val localData = iter.toArray - val points = localData.map { pt => pt.features.toBreeze } - val labels = localData.map { pt => pt.label } + val points = localData.map { case (y, x) => x.toBreeze } + val labels = localData.map { case (y, x) => y } Iterator(SubProblem(points, labels)) } @@ -30,7 +35,8 @@ class BSPADMMwithSGD { val epsilon = 0.01 var iter = 0 - var w_avg: BV = input.take(1)(0).features.toBreeze*0 + // Make a zero vector + var w_avg: BV = input.first()._2.toBreeze*0 while (iter < numADMMIterations && primalResidual < epsilon && dualResidual < epsilon) { val local_w = solvers.map { s => s.solveLocal(w_avg, rho, epsilon) } . collect() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/LocalPegasosSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/LocalPegasosSGD.scala deleted file mode 100644 index 7824efad4213f..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/LocalPegasosSGD.scala +++ /dev/null @@ -1,32 +0,0 @@ -package org.apache.spark.mllib.admm - -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.optimization.Gradient -import org.apache.spark.mllib.linalg.Vector - -import breeze.linalg.{axpy => brzAxpy} - - -object LocalPegasosSGD { - def solve(input: RDD[LabeledPoint], - lambda: Double, - dim: Int, - iterations: Int, - initial_t: Int, - w: Vector, - grad: Gradient) { - for (t <- initial_t until initial_t+iterations) { - val point = input.takeSample(false, 1).apply(0) - val x = point.features - val y = point.label - - val (point_gradient, loss) = grad.compute(x, y, w) - - val eta = 1./(lambda * (t+1)) - brzAxpy(eta, point_gradient.toBreeze, w.toBreeze) - } - w - } - -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala new file mode 100644 index 0000000000000..d403eb7eef395 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala @@ -0,0 +1,51 @@ +package org.apache.spark.mllib.admm + +import org.apache.spark.SparkException +import org.apache.spark.mllib.classification.{SVMWithSGD, SVMModel} +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.optimization.Optimizer +import org.apache.spark.mllib.regression.{LabeledPoint, GeneralizedLinearAlgorithm} +import org.apache.spark.rdd.RDD + +class PegasosSVM(val iterations: Integer = 10, + val rho: Double = .1, + val lambda: Double = 0.1) extends SVMWithSGD { + + override def run(input: RDD[LabeledPoint]): SVMModel = { + // Check the data properties before running the optimizer + if (validateData && !validators.forall(func => func(input))) { + throw new SparkException("Input validation failed.") + } + // Prepend an extra variable consisting of all 1.0's for the intercept. + val data = if (addIntercept) { + input.map(labeledPoint => (labeledPoint.label, prependOne(labeledPoint.features))) + } else { + input.map(labeledPoint => (labeledPoint.label, labeledPoint.features)) + } + val numFeatures: Int = input.first().features.size + val initialWeights = Vectors.dense(new Array[Double](numFeatures)) + val initialWeightsWithIntercept = if (addIntercept) { + prependOne(initialWeights) + } else { + initialWeights + } + + val weightsWithIntercept = + Vectors.fromBreeze(BSPADMMwithSGD.train(data, iterations, rho, new PegasosBVGradient(lambda), initialWeights)) + + val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0 + val weights = + if (addIntercept) { + Vectors.dense(weightsWithIntercept.toArray.slice(1, weightsWithIntercept.size)) + } else { + weightsWithIntercept + } + + createModel(weights, intercept) + + } + + override protected def createModel(weights: Vector, intercept: Double) = { + new SVMModel(weights, intercept) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 8cca926f1c92e..1e0142d9d1d2d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -125,7 +125,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] } /** Prepends one to the input vector. */ - private def prependOne(vector: Vector): Vector = { + def prependOne(vector: Vector): Vector = { val vector1 = vector.toBreeze match { case dv: BDV[Double] => BDV.vertcat(BDV.ones[Double](1), dv) case sv: BSV[Double] => BSV.vertcat(new BSV[Double](Array(0), Array(1.0), 1), sv) From d3202e50dae540ef406094d8d469f3ea208a8990 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 9 Jul 2014 20:26:02 -0700 Subject: [PATCH 3/7] Builds and runs. --- .../org/apache/spark/mllib/admm/BSPADMM.scala | 28 +++++++++++++------ .../apache/spark/mllib/admm/PegasosSVM.scala | 7 ++--- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala index 912802c640383..b358f8a25fbd1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala @@ -1,10 +1,8 @@ package org.apache.spark.mllib.admm -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.regression.LabeledPoint +import breeze.linalg.norm import org.apache.spark.mllib.linalg.Vector - -import breeze.util.Implicits._ +import org.apache.spark.rdd.RDD case class SubProblem(points: Array[BV], labels: Array[Double]) @@ -12,7 +10,6 @@ case class SubProblem(points: Array[BV], labels: Array[Double]) object BSPADMMwithSGD { def train(input: RDD[(Double, Vector)], numADMMIterations: Int, - rho: Double, gradient: BVGradient, initialWeights: Vector) = { val subProblems: RDD[SubProblem] = input.mapPartitions{ iter => @@ -34,18 +31,31 @@ object BSPADMMwithSGD { var dualResidual = Double.MaxValue val epsilon = 0.01 var iter = 0 + var rho = 0.0 // Make a zero vector - var w_avg: BV = input.first()._2.toBreeze*0 + var w_avg: BV = input.first()._2.toBreeze * 0.0 while (iter < numADMMIterations && primalResidual < epsilon && dualResidual < epsilon) { val local_w = solvers.map { s => s.solveLocal(w_avg, rho, epsilon) } . collect() - val new_w_avg: BV = local_w.sum / numSolvers + val new_w_avg: BV = local_w.reduce(_ + _) / numSolvers.toDouble - primalResidual = Math.pow(local_w.foldLeft(0.0){ (sum , w) => (w - new_w_avg).norm() }, 2) - dualResidual = rho * Math.pow((new_w_avg - w_avg).norm(), 2) + // Update the residuals + // primalResidual = sum( ||w_i - w_avg||_2^2 ) + primalResidual = Math.pow(local_w.foldLeft(0.0){ (sum , w) => norm(w - new_w_avg, 2.0) }, 2) + dualResidual = rho * Math.pow(norm(new_w_avg - w_avg, 2.0), 2) + + // Rho upate from Boyd text + if (rho == 0) { + rho = epsilon + } else if (primalResidual > 10.0 * dualResidual) { + rho = 2.0 * rho + } else if (10.0 * dualResidual > primalResidual) { + rho = rho / 2.0 + } w_avg = new_w_avg + iter += 1 } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala index d403eb7eef395..60d9b246be8df 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala @@ -1,10 +1,9 @@ package org.apache.spark.mllib.admm import org.apache.spark.SparkException -import org.apache.spark.mllib.classification.{SVMWithSGD, SVMModel} +import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} import org.apache.spark.mllib.linalg.{Vector, Vectors} -import org.apache.spark.mllib.optimization.Optimizer -import org.apache.spark.mllib.regression.{LabeledPoint, GeneralizedLinearAlgorithm} +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD class PegasosSVM(val iterations: Integer = 10, @@ -31,7 +30,7 @@ class PegasosSVM(val iterations: Integer = 10, } val weightsWithIntercept = - Vectors.fromBreeze(BSPADMMwithSGD.train(data, iterations, rho, new PegasosBVGradient(lambda), initialWeights)) + Vectors.fromBreeze(BSPADMMwithSGD.train(data, iterations, new PegasosBVGradient(lambda), initialWeights)) val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0 val weights = From fc73fbef1891a5eca6c0cd0fafd4489aef34b621 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 10 Jul 2014 13:41:45 -0700 Subject: [PATCH 4/7] fixed ADMM code --- .../spark/mllib/admm/ADMMLocalSolver.scala | 51 ++++++++++--------- .../org/apache/spark/mllib/admm/BSPADMM.scala | 16 ++++-- .../apache/spark/mllib/admm/BVGradient.scala | 12 +++-- .../apache/spark/mllib/admm/PegasosSVM.scala | 2 + 4 files changed, 49 insertions(+), 32 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala index a94dbdfd6a7df..d86918fc7709b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/ADMMLocalSolver.scala @@ -1,13 +1,14 @@ package org.apache.spark.mllib.admm +import breeze.linalg.{norm, axpy} + import scala.util.Random -import breeze.linalg.axpy -import breeze.util.Implicits._ abstract class ADMMLocalSolver(val points: Array[BV], val labels: Array[Double]) { - var lagrangian: BV = points(0) * 0.0 - var w_prev: BV = lagrangian.copy + // Initialize the lagrangian multiplier and initial weight vector at [0.0, ..., 0.0] + var lagrangianMultiplier: BV = points(0) * 0.0 + var w: BV = points(0) * 0.0 def solveLocal(theta_avg: BV, rho: Double, epsilon: Double = 0.01, initial_w: BV = null): BV } @@ -18,32 +19,36 @@ class ADMMSGDLocalSolver(points: Array[BV], val eta_0: Double, val maxIterations: Int = Integer.MAX_VALUE) extends ADMMLocalSolver(points, labels) { def solveLocal(w_avg: BV, rho: Double, epsilon: Double = 0.01, initial_w: BV = null): BV = { - lagrangian += (w_avg - w_prev) * rho + // Update the lagrangian Multiplier by taking a gradient step + lagrangianMultiplier += (w - w_avg) * rho - var i = 0 + var t = 0 var residual = Double.MaxValue - var w = if(initial_w != null) { - initial_w.copy - } else { - w_avg.copy - } - - // TODO: fix residual - while(i < maxIterations && residual > epsilon) { + // var w: BV = if(initial_w != null) initial_w.copy else w_avg.copy + while(t < maxIterations && residual > epsilon) { val pointIndex = Random.nextInt(points.length) val x = points(pointIndex) val y = labels(pointIndex) - - val point_gradient = gradient(x, y, w) - - point_gradient += lagrangian + (w - w_avg)*rho - - val eta_t = eta_0/(i+1) - axpy(eta_t, point_gradient, w) + // Compute the gradient of the full L + val point_gradient = gradient(x, y, w) + lagrangianMultiplier + (w - w_avg) * rho + // Set the learning rate + val eta_t = eta_0 / (t + 1) + // w = w + eta_t * point_gradient + axpy(-eta_t, point_gradient, w) + // Compute residual + residual = eta_t * norm(point_gradient, 2.0) + t += 1 } + // Check the local prediction error: + val propCorrect = + points.zip(labels).map { case (x,y) => if (x.dot(w) * (y * 2.0 - 1.0) > 0.0) 1 else 0 } + .reduce(_ + _).toDouble / points.length + println(s"Local prop correct: $propCorrect") + + println(s"Local iterations: ${t}") - w_prev = w - w + // Return the final weight vector + w.copy } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala index b358f8a25fbd1..1795c71643981 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala @@ -12,6 +12,7 @@ object BSPADMMwithSGD { numADMMIterations: Int, gradient: BVGradient, initialWeights: Vector) = { + val subProblems: RDD[SubProblem] = input.mapPartitions{ iter => val localData = iter.toArray val points = localData.map { case (y, x) => x.toBreeze } @@ -22,8 +23,10 @@ object BSPADMMwithSGD { val solvers = subProblems.map { s => val regularizer = 0.1 val gradient = new PegasosBVGradient(regularizer) + println("Building Solver") new ADMMSGDLocalSolver(s.points, s.labels, gradient, eta_0 = regularizer, maxIterations = 5) }.cache() + println(s"number of solver ${solvers.count()}") val numSolvers = solvers.partitions.length @@ -35,8 +38,8 @@ object BSPADMMwithSGD { // Make a zero vector var w_avg: BV = input.first()._2.toBreeze * 0.0 - - while (iter < numADMMIterations && primalResidual < epsilon && dualResidual < epsilon) { + while (iter < numADMMIterations || primalResidual > epsilon || dualResidual > epsilon) { + println(s"Starting iteration ${iter}.") val local_w = solvers.map { s => s.solveLocal(w_avg, rho, epsilon) } . collect() val new_w_avg: BV = local_w.reduce(_ + _) / numSolvers.toDouble @@ -46,16 +49,21 @@ object BSPADMMwithSGD { dualResidual = rho * Math.pow(norm(new_w_avg - w_avg, 2.0), 2) // Rho upate from Boyd text - if (rho == 0) { + if (rho == 0.0) { rho = epsilon } else if (primalResidual > 10.0 * dualResidual) { rho = 2.0 * rho - } else if (10.0 * dualResidual > primalResidual) { + println("Increasing rho") + } else if (dualResidual > 10.0 * primalResidual) { rho = rho / 2.0 + println("Decreasing rho") } w_avg = new_w_avg + println(s"Iteration: ${iter}") + println(s"(Primal Resid, Dual Resid, Rho): ${primalResidual}, \t ${dualResidual}, \t ${rho}") + iter += 1 } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/BVGradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/BVGradient.scala index b7a0f0bcaa8c5..9e2f993edc7f9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/BVGradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/BVGradient.scala @@ -6,12 +6,14 @@ abstract class BVGradient { } class PegasosBVGradient(val lambda: Double) extends BVGradient { - def apply(data: BV, label: Double, weights: BV): BV = { - val prod = label * data.dot(weights) + def apply(x: BV, label: Double, weights: BV): BV = { + val y: Double = if (label <= 0.0) -1.0 else 1.0 - val ret: BV = weights * label - if(prod < 1) { - ret -= data * label + val prod = y * x.dot(weights) + + val ret: BV = weights * y + if(prod < 1.0) { + ret -= x * y } ret diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala index 60d9b246be8df..29603a6ce278c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala @@ -11,10 +11,12 @@ class PegasosSVM(val iterations: Integer = 10, val lambda: Double = 0.1) extends SVMWithSGD { override def run(input: RDD[LabeledPoint]): SVMModel = { + println("Running pegasos svm") // Check the data properties before running the optimizer if (validateData && !validators.forall(func => func(input))) { throw new SparkException("Input validation failed.") } + // Prepend an extra variable consisting of all 1.0's for the intercept. val data = if (addIntercept) { input.map(labeledPoint => (labeledPoint.label, prependOne(labeledPoint.features))) From 21e656f932591736bf7e94af441793438d0212f4 Mon Sep 17 00:00:00 2001 From: Peter Bailis Date: Thu, 10 Jul 2014 14:21:16 -0700 Subject: [PATCH 5/7] Doing extreme violence to all things Spark and Akka; initial cut at async map-only job. --- .../spark/deploy/client/TestClient.scala | 5 + .../apache/spark/deploy/worker/Worker.scala | 10 ++ .../org/apache/spark/util/AkkaUtils.scala | 6 + .../examples/mllib/BinaryClassification.scala | 11 +- .../apache/spark/mllib/admm/AsyncADMM.scala | 129 ++++++++++++++++++ .../org/apache/spark/mllib/admm/BSPADMM.scala | 6 +- .../apache/spark/mllib/admm/PegasosSVM.scala | 7 +- 7 files changed, 168 insertions(+), 6 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/admm/AsyncADMM.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index e15a87bd38fda..4b6a876a8e3da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.client import org.apache.spark.{SecurityManager, SparkConf, Logging} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.deploy.worker.Worker private[spark] object TestClient { @@ -48,6 +49,10 @@ private[spark] object TestClient { val conf = new SparkConf val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) + + Worker.HACKakkaHost = "akka.tcp://%s@%s:%s/user/".format("spark", port, 0) + Worker.HACKworkerActorSystem = actorSystem + val desc = new ApplicationDescription( "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq()), Some("dummy-spark-home"), "ignored") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ce425443051b0..5ccd578a02d51 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -366,11 +366,15 @@ private[spark] class Worker( } private[spark] object Worker extends Logging { + var HACKworkerActorSystem: ActorSystem = null + var HACKakkaHost: String = null + def main(argStrings: Array[String]) { SignalLogger.register(log) val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) + actorSystem.awaitTermination() } @@ -387,11 +391,17 @@ private[spark] object Worker extends Logging { val conf = new SparkConf val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val actorName = "Worker" + val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) + actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) + + Worker.HACKakkaHost = "akka.tcp://%s@%s:%s/user/".format(systemName, host, boundPort) + Worker.HACKworkerActorSystem = actorSystem + (actorSystem, boundPort) } diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 9930c717492f2..ef722ed3cd1ce 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -25,6 +25,7 @@ import com.typesafe.config.ConfigFactory import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.deploy.worker.Worker /** * Various utility classes for working with Akka. @@ -104,6 +105,11 @@ private[spark] object AkkaUtils extends Logging { val actorSystem = ActorSystem(name, akkaConf) val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get + + Worker.HACKakkaHost = "akka.tcp://%s@%s:%s/user/".format(name, host, boundPort) + Worker.HACKworkerActorSystem = actorSystem + + (actorSystem, boundPort) } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala index 496dd4b7df474..b0c8a02ebc793 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala @@ -39,7 +39,7 @@ object BinaryClassification { object Algorithm extends Enumeration { type Algorithm = Value - val SVM, LR, Pegasos = Value + val SVM, LR, Pegasos, PegasosAsync = Value } object RegType extends Enumeration { @@ -107,7 +107,7 @@ object BinaryClassification { Logger.getRootLogger.setLevel(Level.WARN) - val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + val examples = MLUtils.loadLibSVMFile(sc, params.input).repartition(4).cache() val splits = examples.randomSplit(Array(0.8, 0.2)) val training = splits(0).cache() @@ -115,6 +115,9 @@ object BinaryClassification { val numTraining = training.count() val numTest = test.count() + + println(s"defaultparallelism: ${sc.defaultParallelism} minpart: ${sc.defaultMinPartitions}") + println(s"Training: $numTraining, test: $numTest.") examples.unpersist(blocking = false) @@ -144,6 +147,10 @@ object BinaryClassification { case Pegasos => val algorithm = new PegasosSVM() algorithm.run(training) + case PegasosAsync => + val algorithm = new PegasosSVM(async = true) + algorithm.run(training) + } val prediction = model.predict(test.map(_.features)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/AsyncADMM.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/AsyncADMM.scala new file mode 100644 index 0000000000000..4a1fe41df73dd --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/AsyncADMM.scala @@ -0,0 +1,129 @@ +package org.apache.spark.mllib.admm + +import breeze.linalg.norm +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Vector +import akka.actor._ +import org.apache.spark.{SparkEnv, SparkContext} +import org.apache.spark.deploy.worker.Worker +import scala.language.postfixOps +import java.util.UUID +import java.util + +import akka.pattern.ask +import akka.util.Timeout +import scala.concurrent.duration._ +import scala.concurrent.Await +import org.apache.spark.mllib.admm.InternalMessages.PingPong +import scala.collection.mutable + +case class AsyncSubProblem(points: Array[BV], labels: Array[Double], comm: WorkerCommunication) + +// fuck actors +class WorkerCommunicationHack { + var ref: WorkerCommunication = null +} + +object InternalMessages { + class WakeupMsg + class PingPong +} + +class WorkerCommunication(val address: String, val hack: WorkerCommunicationHack) extends Actor { + hack.ref = this + var others = new mutable.HashMap[Int, ActorSelection] + + def receive = { + case ppm: PingPong => { + println("new message from "+sender) + sender ! "gotit!" + } + case m: InternalMessages.WakeupMsg => { println("activated local!"); sender ! "yo" } + case s: String => println(s) + case _ => println("hello, world!") + } + + def shuttingDown: Receive = { + case _ => println("GOT SHUTDOWN!") + } + + def connectToOthers(allHosts: Array[String]) { + var i = 0 + for(host <- allHosts) { + if(!host.equals(address)) { + others.put(i, context.actorSelection(allHosts(i))) + /* + others(i) ! new PingPong + implicit val timeout = Timeout(15 seconds) + + val f = others(i).resolveOne() + Await.ready(f, Duration.Inf) + println(f.value.get.get) + */ + } + i += 1 + } + } + + def sendPingPongs() { + for(other <- others.values) { + other ! new PingPong + } + } +} + +object AsyncADMMwithSGD { + def train(input: RDD[(Double, Vector)], + numADMMIterations: Int, + gradient: BVGradient, + initialWeights: Vector) = { + val subProblems: RDD[BSPSubProblem] = input.mapPartitions { + iter => + val localData = iter.toArray + val points = localData.map { + case (y, x) => x.toBreeze + } + val labels = localData.map { + case (y, x) => y + } + Iterator(BSPSubProblem(points, labels)) + } + + val workers: RDD[(WorkerCommunication)] = input.mapPartitions { + iter => + val workerName = UUID.randomUUID().toString + val address = Worker.HACKakkaHost+workerName + val hack = new WorkerCommunicationHack() + println(address) + val aref= Worker.HACKworkerActorSystem.actorOf(Props(new WorkerCommunication(address, hack)), workerName) + implicit val timeout = Timeout(15 seconds) + + val f = aref ? new InternalMessages.WakeupMsg + Await.result(f, timeout.duration).asInstanceOf[String] + + Iterator(hack.ref) + } + + val addresses = workers.map { w => w.address }.collect() + + workers.foreach { + w => w.connectToOthers(addresses) + } + + workers.foreach { + w => w.sendPingPongs() + } + + val asyncProblems: RDD[AsyncSubProblem] = workers.zipPartitions(subProblems) { + (wit, pit) => { + val w = wit.next() + val p = pit.next() + Iterator(new AsyncSubProblem(p.points, p.labels, w)) + } + } + + Thread.sleep(1000) + + null + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala index 1795c71643981..282c784a386ba 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/BSPADMM.scala @@ -5,7 +5,7 @@ import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD -case class SubProblem(points: Array[BV], labels: Array[Double]) +case class BSPSubProblem(points: Array[BV], labels: Array[Double]) object BSPADMMwithSGD { def train(input: RDD[(Double, Vector)], @@ -13,11 +13,11 @@ object BSPADMMwithSGD { gradient: BVGradient, initialWeights: Vector) = { - val subProblems: RDD[SubProblem] = input.mapPartitions{ iter => + val subProblems: RDD[BSPSubProblem] = input.mapPartitions{ iter => val localData = iter.toArray val points = localData.map { case (y, x) => x.toBreeze } val labels = localData.map { case (y, x) => y } - Iterator(SubProblem(points, labels)) + Iterator(BSPSubProblem(points, labels)) } val solvers = subProblems.map { s => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala index 29603a6ce278c..df1a58213875a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/admm/PegasosSVM.scala @@ -8,7 +8,8 @@ import org.apache.spark.rdd.RDD class PegasosSVM(val iterations: Integer = 10, val rho: Double = .1, - val lambda: Double = 0.1) extends SVMWithSGD { + val lambda: Double = 0.1, + val async: Boolean = false) extends SVMWithSGD { override def run(input: RDD[LabeledPoint]): SVMModel = { println("Running pegasos svm") @@ -32,7 +33,11 @@ class PegasosSVM(val iterations: Integer = 10, } val weightsWithIntercept = + if(!async) Vectors.fromBreeze(BSPADMMwithSGD.train(data, iterations, new PegasosBVGradient(lambda), initialWeights)) + else + Vectors.fromBreeze(AsyncADMMwithSGD.train(data, iterations, new PegasosBVGradient(lambda), initialWeights)) + val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0 val weights = From 09caa96306ed5cc7eab6bf4af79e8fe80b8eae31 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Fri, 11 Jul 2014 17:30:55 -0700 Subject: [PATCH 6/7] Adding Basic ADMM optimization library --- .../spark/mllib/optimization/ADMM.scala | 212 ++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMM.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMM.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMM.scala new file mode 100644 index 0000000000000..7a5f0f9085c8a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMM.scala @@ -0,0 +1,212 @@ +package org.apache.spark.mllib.optimization + +import org.apache.spark.Logging +import breeze.linalg._ +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.rdd.RDD +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV, norm} +import breeze.util.DoubleImplicits + + +import scala.util.Random + +/** + * AN ADMM Local Solver is used to solve the optimization problem within each partition. + */ +@DeveloperApi +trait LocalOptimizer extends Serializable { + def apply(data: Array[(Double, Vector)], w: BDV[Double], w_avg: BDV[Double], lambda: BDV[Double], + rho: Double): BDV[Double] +} + + +@DeveloperApi +class GradientDescentLocalOptimizer(val gradient: Gradient, + val eta_0: Double = 1.0, + val maxIterations: Int = Integer.MAX_VALUE, + val epsilon: Double = 0.001) extends LocalOptimizer { + def apply(data: Array[(Double, Vector)], w0: BDV[Double], w_avg: BDV[Double], lambda: BDV[Double], + rho: Double): BDV[Double] = { + var t = 0 + var residual = Double.MaxValue + val w = w0.copy + val nExamples = data.length + while(t < maxIterations && residual > epsilon) { + // Compute the total gradient (and loss) + val (gradientSum, lossSum) = + data.foldLeft((BDV.zeros[Double](w.size), 0.0)) { (c, v) => + val (gradSum, lossSum) = c + val (label, features) = v + val loss = gradient.compute(features, label, Vectors.fromBreeze(w), Vectors.fromBreeze(gradSum)) + (gradSum, lossSum + loss) + } + // compute the gradient of the full lagrangian + val gradL: BDV[Double] = (gradientSum / nExamples.toDouble) + lambda + (w - w_avg) * rho + val w2: BDV[Double] = w + // Set the learning rate + val eta_t = eta_0 / (t + 1) + // w = w + eta_t * point_gradient + axpy(-eta_t, gradL, w) + // Compute residual + residual = eta_t * norm(gradL, 2.0) + t += 1 + } + // Check the local prediction error: + val propCorrect = + data.map { case (y,x) => if (x.toBreeze.dot(w) * (y * 2.0 - 1.0) > 0.0) 1 else 0 } + .reduce(_ + _).toDouble / nExamples.toDouble + println(s"Local prop correct: $propCorrect") + println(s"Local iterations: ${t}") + // Return the final weight vector + w + } +} + +@DeveloperApi +class SGDLocalOptimizer(val gradient: Gradient, + val eta_0: Double = 1.0, + val maxIterations: Int = Integer.MAX_VALUE, + val epsilon: Double = 0.001) extends LocalOptimizer { + def apply(data: Array[(Double, Vector)], w0: BDV[Double], w_avg: BDV[Double], lambda: BDV[Double], + rho: Double): BDV[Double] = { + var t = 0 + var residual = Double.MaxValue + val w: BDV[Double] = w0.copy + val nExamples = data.length + while(t < maxIterations && residual > epsilon) { + val (label, features) = data(Random.nextInt(nExamples)) + val (gradLoss, loss) = gradient.compute(features, label, Vectors.fromBreeze(w)) + // compute the gradient of the full lagrangian + val gradL = gradLoss.toBreeze.asInstanceOf[BDV[Double]] + lambda + (w - w_avg) * rho + // Set the learning rate + val eta_t = eta_0 / (t + 1) + // w = w + eta_t * point_gradient + axpy(-eta_t, gradL, w) + // Compute residual + residual = eta_t * norm(gradL, 2.0) + t += 1 + } + // Check the local prediction error: + val propCorrect = + data.map { case (y,x) => if (x.toBreeze.dot(w) * (y * 2.0 - 1.0) > 0.0) 1 else 0 } + .reduce(_ + _).toDouble / nExamples.toDouble + println(s"Local prop correct: $propCorrect") + println(s"Local iterations: ${t}") + // Return the final weight vector + w + } +} + + +class ADMM private[mllib] extends Optimizer with Logging { + + private var numIterations: Int = 100 + private var regParam: Double = 0.0 + private var epsilon: Double = 0.0 + private var localOptimizer: LocalOptimizer = null + + /** + * Set the number of iterations for ADMM. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + + /** + * Set the regularization parameter. Default 0.0. + */ + def setRegParam(regParam: Double): this.type = { + this.regParam = regParam + this + } + + /** + * Set the local optimizer to use for subproblems. + */ + def setLocalOptimizer(opt: LocalOptimizer): this.type = { + this.localOptimizer = opt + this + } + + /** + * Set the local optimizer to use for subproblems. + */ + def setEpsilon(epsilon: Double): this.type = { + this.epsilon = epsilon + this + } + + + /** + * Solve the provided convex optimization problem. + */ + override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = { + + val blockData: RDD[Array[(Double, Vector)]] = data.mapPartitions(iter => Iterator(iter.toArray)).cache() + val dim = blockData.map(block => block(0)._2.size).first() + val nExamples = blockData.map(block => block.length).reduce(_+_) + val numPartitions = blockData.partitions.length + println(s"nExamples: $nExamples") + println(s"dim: $dim") + println(s"number of solver ${numPartitions}") + + + var primalResidual = Double.MaxValue + var dualResidual = Double.MaxValue + var iter = 0 + var rho = 0.0 + + // Make a zero vector + var wAndLambda = blockData.map{ block => + val dim = block(0)._2.size + (BDV.zeros[Double](dim), BDV.zeros[Double](dim)) + } + var w_avg: BDV[Double] = BDV.zeros[Double](dim) + + val optimizer = localOptimizer + while (iter < numIterations || primalResidual > epsilon || dualResidual > epsilon) { + println(s"Starting iteration ${iter}.") + // Compute w and new lambda + wAndLambda = blockData.zipPartitions(wAndLambda) { (dataIter, modelIter) => + dataIter.zip(modelIter).map { case (data, (w_old, lambda_old)) => + // Update the lagrangian Multiplier by taking a gradient step + val lambda: BDV[Double] = lambda_old + (w_old - w_avg) * rho + val w = optimizer(data, w_old, w_avg, lambda, rho) + (w, lambda) + } + }.cache() + // Compute new w_avg + val new_w_avg = blockData.zipPartitions(wAndLambda) { (dataIter, modelIter) => + dataIter.zip(modelIter).map { case (data, (w, _)) => w * data.length.toDouble } + }.reduce(_ + _) / nExamples.toDouble + + // Update the residuals + // primalResidual = sum( ||w_i - w_avg||_2^2 ) + primalResidual = Math.pow( wAndLambda.map { case (w, _) => norm(w - new_w_avg, 2.0) }.reduce(_ + _), 2) + dualResidual = rho * Math.pow(norm(new_w_avg - w_avg, 2.0), 2) + + // Rho upate from Boyd text + if (rho == 0.0) { + rho = epsilon + } else if (primalResidual > 10.0 * dualResidual) { + rho = 2.0 * rho + println("Increasing rho") + } else if (dualResidual > 10.0 * primalResidual) { + rho = rho / 2.0 + println("Decreasing rho") + } + + w_avg = new_w_avg + + println(s"Iteration: ${iter}") + println(s"(Primal Resid, Dual Resid, Rho): ${primalResidual}, \t ${dualResidual}, \t ${rho}") + + iter += 1 + } + + Vectors.fromBreeze(w_avg) + } + +} From 38cfd2f12ad0016969909f894f38d83b1c6f1b86 Mon Sep 17 00:00:00 2001 From: Peter Bailis Date: Wed, 16 Jul 2014 16:29:42 -0700 Subject: [PATCH 7/7] checkpoint --- .../spark/mllib/optimization/ADMM.scala | 1 - .../spark/mllib/optimization/AsyncADMM.scala | 281 ++++++++++++++++++ 2 files changed, 281 insertions(+), 1 deletion(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/optimization/AsyncADMM.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMM.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMM.scala index 7a5f0f9085c8a..1e2a11e652a42 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMM.scala @@ -98,7 +98,6 @@ class SGDLocalOptimizer(val gradient: Gradient, } } - class ADMM private[mllib] extends Optimizer with Logging { private var numIterations: Int = 100 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/AsyncADMM.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/AsyncADMM.scala new file mode 100644 index 0000000000000..9e5eedcc42a7f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/AsyncADMM.scala @@ -0,0 +1,281 @@ +package org.apache.spark.mllib.optimization + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.mllib.linalg.{Vectors, Vector} + +import breeze.linalg._ +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV, norm} +import breeze.util.DoubleImplicits +import akka.actor.{Props, ActorSelection, Actor} +import scala.collection.mutable +import org.apache.spark.rdd.RDD +import java.util.UUID +import org.apache.spark.deploy.worker.Worker +import akka.util.Timeout +import scala.concurrent.Await + +import akka.pattern.ask +import akka.util.Timeout +import scala.concurrent.duration._ +import scala.concurrent.Await +import scala.collection.mutable +import scala.util.Random +import org.apache.spark.Logging + +import scala.language.postfixOps + + +// fuck actors +class WorkerCommunicationHack { + var ref: WorkerCommunication = null +} + +object InternalMessages { + class WakeupMsg + class PingPong + case class CurrentVector(v: BV[Double]) +} + +class WorkerCommunication(val address: String, val hack: WorkerCommunicationHack) extends Actor { + hack.ref = this + var others = new mutable.HashMap[Int, ActorSelection] + + var currentW: BV[Double] = null + var primalResidual: Double = -1 + var dualResidual: Double = -1 + var currentWAvg: BV[Double] = null + var rho: Double = -1 + var epsilon: Double = -1 + + def receive = { + // Someone sent us a vector update + case m: InternalMessages.CurrentVector => { + + + // calculate new average; this is broken! + var new_w_avg = (currentW + m.v) / 2 + + // Update the residuals + // primalResidual = sum( ||w_i - w_avg||_2^2 ) + primalResidual = Math.pow( norm(currentW - currentWAvg, 2.0), 2) + dualResidual = rho * Math.pow(norm(new_w_avg - currentWAvg, 2.0), 2) + + // Rho upate from Boyd text + if (rho == 0.0) { + rho = epsilon + } else if (primalResidual > 10.0 * dualResidual) { + rho = 2.0 * rho + println("Increasing rho") + } else if (dualResidual > 10.0 * primalResidual) { + rho = rho / 2.0 + println("Decreasing rho") + } + + currentWAvg = new_w_avg + } + + case ppm: InternalMessages.PingPong => { + println("new message from "+sender) + sender ! "gotit!" + } + case m: InternalMessages.WakeupMsg => { println("activated local!"); sender ! "yo" } + case s: String => println(s) + case _ => println("hello, world!") + } + + def shuttingDown: Receive = { + case _ => println("GOT SHUTDOWN!") + } + + def connectToOthers(allHosts: Array[String]) { + var i = 0 + for(host <- allHosts) { + if(!host.equals(address)) { + others.put(i, context.actorSelection(allHosts(i))) + } + i += 1 + } + } + + def sendPingPongs() { + for(other <- others.values) { + other ! new InternalMessages.PingPong + } + } + + def broadcastWeightVector(v: BV[Double]) { + for(other <- others.values) { + other ! new InternalMessages.CurrentVector(v) + } + } +} + +// Set up per-partition communication network between workers +object CommSetup { + def setup(input: RDD[(Double, Vector)]): RDD[WorkerCommunication] = { + val workers: RDD[(WorkerCommunication)] = input.mapPartitions { + iter => + val workerName = UUID.randomUUID().toString + val address = Worker.HACKakkaHost+workerName + val hack = new WorkerCommunicationHack() + println(address) + val aref= Worker.HACKworkerActorSystem.actorOf(Props(new WorkerCommunication(address, hack)), workerName) + implicit val timeout = Timeout(15 seconds) + + val f = aref ? new InternalMessages.WakeupMsg + Await.result(f, timeout.duration).asInstanceOf[String] + + Iterator(hack.ref) + } + + val addresses = workers.map { w => w.address }.collect() + + workers.foreach { + w => w.connectToOthers(addresses) + } + + workers.foreach { + w => w.sendPingPongs() + } + + workers + } +} + +@DeveloperApi +class AsyncSGDLocalOptimizer(val gradient: Gradient, + val eta_0: Double = 1.0, + val maxIterations: Int = Integer.MAX_VALUE, + val epsilon: Double = 0.001) { + def apply(data: Array[(Double, Vector)], w0: BDV[Double], w_avg: BDV[Double], lambda: BDV[Double], + rho: Double, comm: WorkerCommunication): BDV[Double] = { + var t = 0 + var residual = Double.MaxValue + val w: BDV[Double] = w0.copy + + comm.currentW = w + comm.currentWAvg = w_avg + comm.rho = rho + comm.epsilon = epsilon + + val nExamples = data.length + while (t < maxIterations && residual > epsilon) { + val (label, features) = data(Random.nextInt(nExamples)) + val (gradLoss, loss) = gradient.compute(features, label, Vectors.fromBreeze(w)) + // compute the gradient of the full lagrangian + val gradL = gradLoss.toBreeze.asInstanceOf[BDV[Double]] + lambda + (w - w_avg) * rho + // Set the learning rate + val eta_t = eta_0 / (t + 1) + // w = w + eta_t * point_gradient + axpy(-eta_t, gradL, w) + // Compute residual + residual = eta_t * norm(gradL, 2.0) + t += 1 + } + // Check the local prediction error: + val propCorrect = + data.map { + case (y, x) => if (x.toBreeze.dot(w) * (y * 2.0 - 1.0) > 0.0) 1 else 0 + } + .reduce(_ + _).toDouble / nExamples.toDouble + println(s"Local prop correct: $propCorrect") + println(s"Local iterations: ${t}") + // Return the final weight vector + w + } +} + + +class AsyncADMM private[mllib] extends Optimizer with Logging { + + private var numIterations: Int = 100 + private var regParam: Double = 0.0 + private var epsilon: Double = 0.0 + private var localOptimizer: AsyncSGDLocalOptimizer = null + + /** + * Set the number of iterations for ADMM. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + + /** + * Set the regularization parameter. Default 0.0. + */ + def setRegParam(regParam: Double): this.type = { + this.regParam = regParam + this + } + + /** + * Set the local optimizer to use for subproblems. + */ + def setLocalOptimizer(opt: AsyncSGDLocalOptimizer): this.type = { + this.localOptimizer = opt + this + } + + /** + * Set the local optimizer to use for subproblems. + */ + def setEpsilon(epsilon: Double): this.type = { + this.epsilon = epsilon + this + } + + + /** + * Solve the provided convex optimization problem. + */ + override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = { + + val blockData: RDD[Array[(Double, Vector)]] = data.mapPartitions(iter => Iterator(iter.toArray)).cache() + val dim = blockData.map(block => block(0)._2.size).first() + val nExamples = blockData.map(block => block.length).reduce(_+_) + val numPartitions = blockData.partitions.length + println(s"nExamples: $nExamples") + println(s"dim: $dim") + println(s"number of solver ${numPartitions}") + + var primalResidual = Double.MaxValue + var dualResidual = Double.MaxValue + var iter = 0 + var rho = 0.0 + + val commSystems = CommSetup.setup(data) + + // Make a zero vector + var wAndLambda = blockData.map{ block => + val dim = block(0)._2.size + (BDV.zeros[Double](dim), BDV.zeros[Double](dim)) + } + var w_avg: BDV[Double] = BDV.zeros[Double](dim) + + val optimizer = localOptimizer + + // Compute w and new lambda + wAndLambda = blockData.zipPartitions(wAndLambda, commSystems) { + (dataIter, modelIter, commSystemIter) => + dataIter.zip(modelIter).zip(commSystemIter).map { case ((data, (w_old, lambda_old)), commSystem) => + // Update the lagrangian Multiplier by taking a gradient step + val lambda: BDV[Double] = lambda_old + (w_old - w_avg) * rho + val w = optimizer(data, w_old, w_avg, lambda, rho, commSystem) + (w, lambda) + } + }.cache() + // Compute new w_avg + val new_w_avg = blockData.zipPartitions(wAndLambda) { (dataIter, modelIter) => + dataIter.zip(modelIter).map { case (data, (w, _)) => w * data.length.toDouble } + }.reduce(_ + _) / nExamples.toDouble + + + w_avg = new_w_avg + + println(s"Iteration: ${iter}") + println(s"(Primal Resid, Dual Resid, Rho): ${primalResidual}, \t ${dualResidual}, \t ${rho}") + + Vectors.fromBreeze(w_avg) + } +} \ No newline at end of file