From 0282fcf458de4fe002969774fb599512c21a5e3d Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Mon, 15 Dec 2014 11:02:33 +0800 Subject: [PATCH] SGD should support custom sampling. --- .../mllib/optimization/GradientDescent.scala | 55 ++++++++- .../spark/mllib/optimization/Sampler.scala | 116 ++++++++++++++++++ 2 files changed, 166 insertions(+), 5 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/optimization/Sampler.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 0857877951c82..ba234929f513a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -39,6 +39,7 @@ class GradientDescent private[mllib] (private var gradient: Gradient, private va private var numIterations: Int = 100 private var regParam: Double = 0.0 private var miniBatchFraction: Double = 1.0 + private var sampler: Sampler = new SimpleSampler() /** * Set the initial step size of SGD for the first step. Default 1.0. @@ -50,16 +51,22 @@ class GradientDescent private[mllib] (private var gradient: Gradient, private va } /** - * :: Experimental :: * Set fraction of data to be used for each SGD iteration. * Default 1.0 (corresponding to deterministic/classical gradient descent) */ - @Experimental def setMiniBatchFraction(fraction: Double): this.type = { this.miniBatchFraction = fraction this } + /** + * Set teh Sampler object (used for data sampling). + */ + def setSampler(sampler: Sampler): this.type = { + this.sampler = sampler + this + } + /** * Set the number of iterations for SGD. Default 100. */ @@ -85,7 +92,6 @@ class GradientDescent private[mllib] (private var gradient: Gradient, private va this } - /** * Set the updater function to actually perform a gradient step in a given direction. * The updater is responsible to perform the update from the regularization term as well, @@ -109,6 +115,7 @@ class GradientDescent private[mllib] (private var gradient: Gradient, private va data, gradient, updater, + sampler, stepSize, numIterations, regParam, @@ -156,6 +163,43 @@ object GradientDescent extends Logging { regParam: Double, miniBatchFraction: Double, initialWeights: Vector): (Vector, Array[Double]) = { + runMiniBatchSGD(data, gradient, updater, new SimpleSampler(), stepSize, + numIterations, regParam, miniBatchFraction, initialWeights) + } + + /** + * Run stochastic gradient descent (SGD) in parallel using mini batches. + * In each iteration, we sample a subset (fraction miniBatchFraction) of the total data + * in order to compute a gradient estimate. + * Sampling, and averaging the subgradients over this subset is performed using one standard + * spark map-reduce in each iteration. + * + * @param data - Input data for SGD. RDD of the set of data examples, each of + * the form (label, [feature values]). + * @param gradient - Gradient object (used to compute the gradient of the loss function of + * one single data example) + * @param updater - Updater function to actually perform a gradient step in a given direction. + * @param sampler - Sampler object (used for data sampling). + * @param stepSize - initial step size for the first step + * @param numIterations - number of iterations that SGD should be run. + * @param regParam - regularization parameter + * @param miniBatchFraction - fraction of the input data set that should be used for + * one iteration of SGD. Default value 1.0. + * + * @return A tuple containing two elements. The first element is a column matrix containing + * weights for every feature, and the second element is an array containing the + * stochastic loss computed for every iteration. + */ + def runMiniBatchSGD( + data: RDD[(Double, Vector)], + gradient: Gradient, + updater: Updater, + sampler: Sampler, + stepSize: Double, + numIterations: Int, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Vector): (Vector, Array[Double]) = { val stochasticLossHistory = new ArrayBuffer[Double](numIterations) @@ -171,6 +215,8 @@ object GradientDescent extends Logging { logWarning("The miniBatchFraction is too small") } + sampler.setMiniBatchFraction(miniBatchFraction) + sampler.setData(data) // Initialize weights as a column vector var weights = Vectors.dense(initialWeights.toArray) val n = weights.size @@ -186,7 +232,7 @@ object GradientDescent extends Logging { val bcWeights = data.context.broadcast(weights) // Sample a subset (fraction miniBatchFraction) of the total data // compute and sum up the subgradients on this subset (this is one map-reduce) - val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i) + val (gradientSum, lossSum, miniBatchSize) = sampler.nextBatchSample(i) .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))( seqOp = (c, v) => { // c: (grad, loss, count), v: (label, features) @@ -217,6 +263,5 @@ object GradientDescent extends Logging { stochasticLossHistory.takeRight(10).mkString(", "))) (weights, stochasticLossHistory.toArray) - } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Sampler.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Sampler.scala new file mode 100644 index 0000000000000..feb7118aa2b56 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Sampler.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import java.util.Random + +import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.sql.{SchemaRDD, SQLContext} + + +/** + * :: DeveloperApi :: + * Trait for Sampler(Used by SGD). + */ +@DeveloperApi +trait Sampler { + + /** + * Set input data for SGD. RDD of the set of data examples, each of + * the form (label, [feature values]). + */ + def setData(data: RDD[(Double, Vector)]): Unit + + /** + * Set the sampling fraction. + */ + def setMiniBatchFraction(miniBatchFraction: Double): Unit + + /** + * Get the next sampling batches. + * @param iter - Iteration number + * @return Sampled data + */ + def nextBatchSample(iter: Int): RDD[(Double, Vector)] +} + +class SimpleSampler(var data: RDD[(Double, Vector)] = null, + var miniBatchFraction: Double = 1D) extends Sampler { + + def setData(data: RDD[(Double, Vector)]): Unit = { + this.data = data + } + + def setMiniBatchFraction(fraction: Double): Unit = { + this.miniBatchFraction = fraction + } + + def nextBatchSample(iterNum: Int): RDD[(Double, Vector)] = { + data.sample(withReplacement = false, miniBatchFraction, 42 + iterNum) + } +} + +private[mllib] case class SamplerData(rand: Double, label: Double, features: Vector) + +class ExternalStorageSampler(val dataDir: String) extends Sampler { + require(dataDir != null && dataDir.nonEmpty, "dataDir is empty!") + + var data: SchemaRDD = null + var miniBatchFraction: Double = 0.1D + + def setData(data: RDD[(Double, Vector)]): Unit = { + val path = new Path(dataDir, "samplerData-" + data.id) + val sqlContext = new SQLContext(data.context) + import sqlContext.createSchemaRDD + + data.mapPartitionsWithIndex((pid, iter) => { + val rand: Random = new Random(pid + 37) + iter.map { data => + SamplerData(rand.nextDouble(), data._1, data._2) + } + }).saveAsParquetFile(path.toString) + this.data = sqlContext.parquetFile(path.toString) + this.data.registerTempTable("sampler_data") + } + + def setMiniBatchFraction(fraction: Double): Unit = { + this.miniBatchFraction = fraction + require(this.miniBatchFraction > 0 && this.miniBatchFraction < 1, + "miniBatchFraction must be greater than 0 and less than 1!") + } + + def nextBatchSample(iter: Int): RDD[(Double, Vector)] = { + var s = iter * miniBatchFraction + s = s - s.floor + var e = (iter + 1) * miniBatchFraction + e = e - e.floor + val w = if (e < s) { + s"(rand >= 0 AND rand < $e) OR (rand >= $s AND rand < 1)" + + } else { + s"rand >= $s AND rand < $e" + } + data.sqlContext.sql(s"SELECT label,features FROM sampler_data WHERE $w").map(r => { + (r.getDouble(0), r.getAs[Vector](1)) + }) + } +}