Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark-7780][MLLIB] Intercept in logisticregressionwith lbfgs should not be regularized #6386

Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a529c01
document plans
holdenk May 22, 2015
f9e2635
Some progress.
holdenk May 22, 2015
7ebbd56
Keep track of the number of requested classes so that if its more tha…
holdenk May 22, 2015
ef2a9b0
Expose a train on instances method within Spark, use numOfLinearPredi…
holdenk May 22, 2015
407491e
tests are fun
holdenk May 24, 2015
e02bf3a
Start updating the tests to run with different updaters.
holdenk May 24, 2015
8517539
get the tests compiling
holdenk May 24, 2015
a619d42
style fixed
holdenk May 24, 2015
4febcc3
make the test method private
holdenk May 24, 2015
e8e03a1
CR feedback, pass RDD of Labeled points to ml implemetnation. Also fr…
holdenk May 24, 2015
38a024b
Convert it to a df and use set for the inital params
holdenk May 25, 2015
478b8c5
Handle non-dense weights
holdenk May 25, 2015
08589f5
CR feedback: make the setInitialWeights function private, don't mess …
holdenk May 26, 2015
f40c401
style fix up
holdenk May 26, 2015
f35a16a
Copy the number of iterations, convergence tolerance, and if we are f…
holdenk Jun 2, 2015
4d431a3
scala style check issue
holdenk Jun 3, 2015
7e41928
Only the weights if we need to.
holdenk Jun 3, 2015
ed351ff
Use appendBias for adding intercept to initial weights , fix generate…
holdenk Jun 3, 2015
3ac02d7
Merge in master
holdenk Jun 8, 2015
d1ce12b
Merge in master
holdenk Jul 9, 2015
8ca0fa9
attempt to merge in master
holdenk Aug 28, 2015
6f66f2c
Merge in master (again)
holdenk Oct 1, 2015
0cedd50
Fix compile error after simple merge
holdenk Oct 2, 2015
2bf289b
Merge branch 'master' into SPARK-7780-intercept-in-logisticregression…
holdenk Dec 30, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,16 @@ class LogisticRegression(override val uid: String)
setDefault(threshold -> 0.5)

override protected def train(dataset: DataFrame): LogisticRegressionModel = {
// Extract columns from data. If dataset is persisted, do not persist oldDataset.
val instances = extractLabeledPoints(dataset).map {
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert the code here to the one in master.

train(extractLabeledPoints(dataset), handlePersistence, None)
}
private [spark] def train(dataset: RDD[LabeledPoint], handlePersistence: Boolean,
optInitialWeights: Option[Vector]=None): LogisticRegressionModel = {
// Extract columns from data. If dataset is persisted, do not persist instances.
val instances = dataset.map {
case LabeledPoint(label: Double, features: Vector) => (label, features)
}
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. revert

if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

val (summarizer, labelSummarizer) = instances.treeAggregate(
Expand Down Expand Up @@ -160,8 +165,8 @@ class LogisticRegression(override val uid: String)
new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
}

val initialWeightsWithIntercept =
Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures)
val initialWeightsWithIntercept = optInitialWeights.getOrElse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In LoR, the intercept represents the prior of classes distribution, so it will converge faster if we set the intercept according to the prior if intercept is included. As a result, we override the intercept in the following couple lines.

    if ($(fitIntercept)) {
      /**
       * For binary logistic regression, when we initialize the weights as zeros,
       * it will converge faster if we initialize the intercept such that
       * it follows the distribution of the labels.
       *
       * {{{
       * P(0) = 1 / (1 + \exp(b)), and
       * P(1) = \exp(b) / (1 + \exp(b))
       * }}}, hence
       * {{{
       * b = \log{P(1) / P(0)} = \log{count_1 / count_0}
       * }}}
       */
      initialWeightsWithIntercept.toArray(numFeatures)
        = math.log(histogram(1).toDouble / histogram(0).toDouble)
    }

When we specify custom intercept, we should not override it by executing the above code. Also, we may check the dims of the custom weights, and if they are not agreed with the one generated pragmatically, we should use the one we generated pragmatically and log it.

Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we call ml version from mllib side by creating DataFrame in LogisticRegressionWithLBFGS, we can handle the model format conversion in mllib. In new ml package, the multinomial LoR will have models as matrix, so if we change too much here, will be difficult to implement MLoR later in ml.

if ($(fitIntercept)) {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.classification.impl.GLMClassificationModel
import org.apache.spark.mllib.linalg.BLAS.dot
import org.apache.spark.mllib.linalg.{DenseVector, Vector}
import org.apache.spark.mllib.linalg.{DenseVector, Vector, Vectors}
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.pmml.PMMLExportable
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.{DataValidators, Saveable, Loader}
import org.apache.spark.rdd.RDD

import org.apache.spark.storage.StorageLevel

/**
* Classification model trained using Multinomial/Binary Logistic Regression.
Expand Down Expand Up @@ -322,6 +322,13 @@ object LogisticRegressionWithSGD {
* Limited-memory BFGS. Standard feature scaling and L2 regularization are used by default.
* NOTE: Labels used in Logistic Regression should be {0, 1, ..., k - 1}
* for k classes multi-label classification problem.
*
* Earlier implementations of LogisticRegressionWithLBFGS applies a regularization
* penalty to all elements including the intercept. If this is called with one of
* standard updaters (L1Updater, or SquaredL2Updater) this is translated
* into a call to ml.LogisticRegression, otherwise this will use the existing mllib
* GeneralizedLinearAlgorithm trainer, resulting in a regularization penalty to the
* intercept.
*/
class LogisticRegressionWithLBFGS
extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {
Expand Down Expand Up @@ -363,4 +370,34 @@ class LogisticRegressionWithLBFGS
new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1)
}
}

/**
* Run the algorithm with the configured parameters on an input RDD
* of LabeledPoint entries starting from the initial weights provided.
* If a known updater is used calls the ml implementation, to avoid
* applying a regularization penalty to the intercept, otherwise
* defaults to the mllib implementation. If more than two classes
* or feature scaling is disabled, always uses mllib implementation.
*/
override def run(input: RDD[LabeledPoint], initialWeights: Vector): LogisticRegressionModel = {
// ml's Logisitic regression only supports binary classifcation currently.
if (numOfLinearPredictor == 1 && useFeatureScaling) {
def runWithMlLogisitcRegression(elasticNetParam: Double) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about? Then we don't have change the other side. Thanks.

    val sqlContext = new SQLContext(input.context)
    import sqlContext.implicits._
    val lor = new org.apache.spark.ml.classification.LogisticRegression()
    lor.fit(input.toDF())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that, the only downside is that on the other side its ripped back out right away. This would also loose the initial weights but I could either modify the signature on the other side to take initial weights or require the the initial weights or zero (which do you think is better)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When people train LoR/LiR with multiple lambda of regularizations for cross-validation, the training algorithm will start from the largest lambda and return the model. The model will be used as the initial condition for the second largest lambda. The process will be repeated until all the lambdas are trained. By using the previous model as initial weights, the convergence rate will be a way faster. http://www.jstatsoft.org/v33/i01/paper

As a result, in order to do so, we need to have ability to specify initial weights. Feel free to add private API to set weights. If the dim of weights is different from the data, then we can use the default one as initial condition.

PS, once this private api is added, we can hook it up with CrossValidation API to train multiple lambdas efficiently. Currently, with multiple lambda, we train from scratch without using the information from previous results. No JIRA now, you can open one if you are interested in this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the passed elasticNetParam is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbtsai that sounds fun, I've added a JIRA to track doing that. For the first part (e.g. now) I just have it defined on LogisticRegression but I could move it to params (currently no vector param exists but can add).
@viirya good point, I've added the set call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think initial weights can be part of params, but we can do it in next iteration when we work on CrossValidation stuff. This regularization path idea will be the same for Liner Regression with elastic net.

val lr = new org.apache.spark.ml.classification.LogisticRegression()
lr.setRegParam(optimizer.getRegParam())
val handlePersistence = input.getStorageLevel == StorageLevel.NONE
val initialWeightsWithIntercept = Vectors.dense(0.0, initialWeights.toArray:_*)
val mlLogisticRegresionModel = lr.train(input, handlePersistence,
Some(initialWeightsWithIntercept))// TODO swap back to including the initialWeights
createModel(mlLogisticRegresionModel.weights, mlLogisticRegresionModel.intercept)
}
optimizer.getUpdater() match {
case x: SquaredL2Updater => runWithMlLogisitcRegression(1.0)
case x: L1Updater => runWithMlLogisitcRegression(0.0)
case _ => super.run(input, initialWeights)
}
} else {
super.run(input, initialWeights)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
this
}

/**
* Get the regularization parameter.
*/
private[mllib] def getRegParam(): Double = {
this.regParam
}

/**
* Set the gradient function (of the loss function of one single data example)
* to be used for L-BFGS.
Expand All @@ -113,6 +120,13 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
this
}

/**
* Returns the updater, limited to internal use.
*/
private[mllib] def getUpdater(): Updater = {
updater
}

override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = LBFGS.runLBFGS(
data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
* translated back to resulting model weights, so it's transparent to users.
* Note: This technique is used in both libsvm and glmnet packages. Default false.
*/
private var useFeatureScaling = false
private[mllib] var useFeatureScaling = false

/**
* The dimension of training features.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.mllib.optimization.{SquaredL2Updater, L1Updater, Updater, LBFGS, LogisticGradient}
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -214,6 +215,11 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext with M

// Test if we can correctly learn A, B where Y = logistic(A + B*X)
test("logistic regression with LBFGS") {
val updaters: List[Updater] = List(new SquaredL2Updater(), new L1Updater())
updaters.foreach(testLBFGS)
}

private def testLBFGS(myUpdater: Updater): Unit = {
val nPoints = 10000
val A = 2.0
val B = -1.5
Expand All @@ -222,7 +228,15 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext with M

val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
val lr = new LogisticRegressionWithLBFGS().setIntercept(true)

// Override the updater
class LogisticRegressionWithLBFGSCustomUpdater
extends LogisticRegressionWithLBFGS {
override val optimizer =
new LBFGS(new LogisticGradient, myUpdater)
}

val lr = new LogisticRegressionWithLBFGSCustomUpdater().setIntercept(true)

val model = lr.run(testRDD)

Expand Down