-
Notifications
You must be signed in to change notification settings - Fork 28k
/
LBFGS.scala
256 lines (225 loc) · 8.75 KB
/
LBFGS.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.optimization
import scala.collection.mutable.ArrayBuffer
import breeze.linalg.{DenseVector => BDV}
import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS}
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.axpy
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd.RDD
/**
* :: DeveloperApi ::
* Class used to solve an optimization problem using Limited-memory BFGS.
* Reference: [[http://en.wikipedia.org/wiki/Limited-memory_BFGS]]
* @param gradient Gradient function to be used.
* @param updater Updater to be used to update weights after every iteration.
*/
@DeveloperApi
class LBFGS(private var gradient: Gradient, private var updater: Updater)
extends Optimizer with Logging {
private var numCorrections = 10
private var convergenceTol = 1E-4
private var maxNumIterations = 100
private var regParam = 0.0
/**
* Set the number of corrections used in the LBFGS update. Default 10.
* Values of numCorrections less than 3 are not recommended; large values
* of numCorrections will result in excessive computing time.
* 3 < numCorrections < 10 is recommended.
* Restriction: numCorrections > 0
*/
def setNumCorrections(corrections: Int): this.type = {
assert(corrections > 0)
this.numCorrections = corrections
this
}
/**
* Set the convergence tolerance of iterations for L-BFGS. Default 1E-4.
* Smaller value will lead to higher accuracy with the cost of more iterations.
*/
def setConvergenceTol(tolerance: Double): this.type = {
this.convergenceTol = tolerance
this
}
/**
* Set the maximal number of iterations for L-BFGS. Default 100.
* @deprecated use [[LBFGS#setNumIterations]] instead
*/
@deprecated("use setNumIterations instead", "1.1.0")
def setMaxNumIterations(iters: Int): this.type = {
this.setNumIterations(iters)
}
/**
* Set the maximal number of iterations for L-BFGS. Default 100.
*/
def setNumIterations(iters: Int): this.type = {
this.maxNumIterations = iters
this
}
/**
* Set the regularization parameter. Default 0.0.
*/
def setRegParam(regParam: Double): this.type = {
this.regParam = regParam
this
}
/**
* Set the gradient function (of the loss function of one single data example)
* to be used for L-BFGS.
*/
def setGradient(gradient: Gradient): this.type = {
this.gradient = gradient
this
}
/**
* Set the updater function to actually perform a gradient step in a given direction.
* The updater is responsible to perform the update from the regularization term as well,
* and therefore determines what kind or regularization is used, if any.
*/
def setUpdater(updater: Updater): this.type = {
this.updater = updater
this
}
override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = LBFGS.runLBFGS(
data,
gradient,
updater,
numCorrections,
convergenceTol,
maxNumIterations,
regParam,
initialWeights)
weights
}
}
/**
* :: DeveloperApi ::
* Top-level method to run L-BFGS.
*/
@DeveloperApi
object LBFGS extends Logging {
/**
* Run Limited-memory BFGS (L-BFGS) in parallel.
* Averaging the subgradients over different partitions is performed using one standard
* spark map-reduce in each iteration.
*
* @param data - Input data for L-BFGS. RDD of the set of data examples, each of
* the form (label, [feature values]).
* @param gradient - Gradient object (used to compute the gradient of the loss function of
* one single data example)
* @param updater - Updater function to actually perform a gradient step in a given direction.
* @param numCorrections - The number of corrections used in the L-BFGS update.
* @param convergenceTol - The convergence tolerance of iterations for L-BFGS
* @param maxNumIterations - Maximal number of iterations that L-BFGS can be run.
* @param regParam - Regularization parameter
*
* @return A tuple containing two elements. The first element is a column matrix containing
* weights for every feature, and the second element is an array containing the loss
* computed for every iteration.
*/
def runLBFGS(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
numCorrections: Int,
convergenceTol: Double,
maxNumIterations: Int,
regParam: Double,
initialWeights: Vector): (Vector, Array[Double]) = {
val lossHistory = new ArrayBuffer[Double](maxNumIterations)
val numExamples = data.count()
val costFun =
new CostFun(data, gradient, updater, regParam, numExamples)
val lbfgs = new BreezeLBFGS[BDV[Double]](maxNumIterations, numCorrections, convergenceTol)
val states =
lbfgs.iterations(new CachedDiffFunction(costFun), initialWeights.toBreeze.toDenseVector)
/**
* NOTE: lossSum and loss is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
var state = states.next()
while(states.hasNext) {
lossHistory.append(state.value)
state = states.next()
}
lossHistory.append(state.value)
val weights = Vectors.fromBreeze(state.x)
logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format(
lossHistory.takeRight(10).mkString(", ")))
(weights, lossHistory.toArray)
}
/**
* CostFun implements Breeze's DiffFunction[T], which returns the loss and gradient
* at a particular point (weights). It's used in Breeze's convex optimization routines.
*/
private class CostFun(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
regParam: Double,
numExamples: Long) extends DiffFunction[BDV[Double]] {
override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
// Have a local copy to avoid the serialization of CostFun object which is not serializable.
val w = Vectors.fromBreeze(weights)
val n = w.size
val bcW = data.context.broadcast(w)
val localGradient = gradient
val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = localGradient.compute(
features, label, bcW.value, grad)
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
axpy(1.0, grad2, grad1)
(grad1, loss1 + loss2)
})
/**
* regVal is sum of weight squares if it's L2 updater;
* for other updater, the same logic is followed.
*/
val regVal = updater.compute(w, Vectors.zeros(n), 0, 1, regParam)._2
val loss = lossSum / numExamples + regVal
/**
* It will return the gradient part of regularization using updater.
*
* Given the input parameters, the updater basically does the following,
*
* w' = w - thisIterStepSize * (gradient + regGradient(w))
* Note that regGradient is function of w
*
* If we set gradient = 0, thisIterStepSize = 1, then
*
* regGradient(w) = w - w'
*
* TODO: We need to clean it up by separating the logic of regularization out
* from updater to regularizer.
*/
// The following gradientTotal is actually the regularization part of gradient.
// Will add the gradientSum computed from the data with weights in the next step.
val gradientTotal = w.copy
axpy(-1.0, updater.compute(w, Vectors.zeros(n), 1, 1, regParam)._1, gradientTotal)
// gradientTotal = gradientSum / numExamples + gradientTotal
axpy(1.0 / numExamples, gradientSum, gradientTotal)
(loss, gradientTotal.toBreeze.asInstanceOf[BDV[Double]])
}
}
}