Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
a529c01
document plans
holdenk May 22, 2015
f9e2635
Some progress.
holdenk May 22, 2015
7ebbd56
Keep track of the number of requested classes so that if its more tha…
holdenk May 22, 2015
ef2a9b0
Expose a train on instances method within Spark, use numOfLinearPredi…
holdenk May 22, 2015
407491e
tests are fun
holdenk May 24, 2015
e02bf3a
Start updating the tests to run with different updaters.
holdenk May 24, 2015
8517539
get the tests compiling
holdenk May 24, 2015
a619d42
style fixed
holdenk May 24, 2015
4febcc3
make the test method private
holdenk May 24, 2015
e8e03a1
CR feedback, pass RDD of Labeled points to ml implemetnation. Also fr…
holdenk May 24, 2015
38a024b
Convert it to a df and use set for the inital params
holdenk May 25, 2015
478b8c5
Handle non-dense weights
holdenk May 25, 2015
08589f5
CR feedback: make the setInitialWeights function private, don't mess …
holdenk May 26, 2015
f40c401
style fix up
holdenk May 26, 2015
f35a16a
Copy the number of iterations, convergence tolerance, and if we are f…
holdenk Jun 2, 2015
4d431a3
scala style check issue
holdenk Jun 3, 2015
7e41928
Only the weights if we need to.
holdenk Jun 3, 2015
ed351ff
Use appendBias for adding intercept to initial weights , fix generate…
holdenk Jun 3, 2015
3ac02d7
Merge in master
holdenk Jun 8, 2015
d1ce12b
Merge in master
holdenk Jul 9, 2015
8ca0fa9
attempt to merge in master
holdenk Aug 28, 2015
6f66f2c
Merge in master (again)
holdenk Oct 1, 2015
0cedd50
Fix compile error after simple merge
holdenk Oct 2, 2015
2bf289b
Merge branch 'master' into SPARK-7780-intercept-in-logisticregression…
holdenk Dec 30, 2015
d7a2631
Merge in master
holdenk Jan 16, 2016
b0fe1e6
scala style import order fix
holdenk Jan 16, 2016
827dcde
Import ordering
holdenk Jan 16, 2016
ba99ce9
Remove some extra blank lines from the merge
holdenk Jan 16, 2016
4b62629
Merge branch 'master' into SPARK-7780-intercept-in-logisticregression…
holdenk Jan 18, 2016
58ad58a
Comment formatting, and s/optInitialWeights/optInitialCoefficients/
holdenk Jan 18, 2016
4caab8c
s/userSuppliedWeights/userSuppliedCoefficients/
holdenk Jan 18, 2016
daf17a5
CR feedback - fix some s/Weights/Coefficients/ and string formatting …
holdenk Jan 18, 2016
72692f8
Switch it from setting the initial weights to setting the initial model
holdenk Jan 18, 2016
cec610a
CR feedback, pass useFeatureScaling as standardization to ML model, b…
holdenk Jan 18, 2016
e8b3ce8
Remove the validation, since the user is now setting an intial model …
holdenk Jan 18, 2016
0e2ea49
whoops
holdenk Jan 18, 2016
6fc1c23
Revert "whoops"
holdenk Jan 18, 2016
05205b3
Revert "Remove the validation, since the user is now setting an intia…
holdenk Jan 18, 2016
1c7a687
Instead keep the model when we set the initial one
holdenk Jan 18, 2016
699997f
Move the validate in-line so make it easier to copy the intercept over
holdenk Jan 18, 2016
e4d6d14
The standardization disabled test producing different values no longe…
holdenk Jan 18, 2016
43a3a32
Switch the test around instead of removing it
holdenk Jan 18, 2016
e1b0389
CR feedback clarify javdocs for the run functions, fix indentation, a…
holdenk Jan 19, 2016
a0bc58a
Merge in master - lots of indentation changes as well
holdenk Jan 19, 2016
d695ec0
Fix auto indent of multi-line comment (works fine normally but the pr…
holdenk Jan 19, 2016
7501b4b
Make the multi line comments match the general style
holdenk Jan 19, 2016
46ae406
CR feedback: avoid double caching, revert to previous comment style f…
holdenk Jan 21, 2016
e6b797a
Merge branch 'master' into SPARK-7780-intercept-in-logisticregression…
holdenk Jan 25, 2016
8016ad8
Style fixes
holdenk Jan 27, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,27 @@ class LogisticRegression @Since("1.2.0") (
@Since("1.5.0")
override def getThresholds: Array[Double] = super.getThresholds

override protected def train(dataset: DataFrame): LogisticRegressionModel = {
// Extract columns from data. If dataset is persisted, do not persist oldDataset.
private var optInitialModel: Option[LogisticRegressionModel] = None

/** @group setParam */
private[spark] def setInitialModel(model: LogisticRegressionModel): this.type = {
this.optInitialModel = Some(model)
this
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we follow #8972 , and have the following code. We can create another seprate JIRA for moving setInitialModel to public with a sharedParam.

  private var initialModel: Option[LogisticRegressionModel] = None

  private def setInitialModel(model: LogisticRegressionModel): this.type = {
    ...
    ...
    this
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have setInitialWeights on StreamingLogisticRegressionWithSGD - would it be better to have it match StreamingLogisticRegressionWithSGD ?


override protected[spark] def train(dataset: DataFrame): LogisticRegressionModel = {
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
train(dataset, handlePersistence)
}

protected[spark] def train(dataset: DataFrame, handlePersistence: Boolean):
LogisticRegressionModel = {
val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))
val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map {
case Row(label: Double, weight: Double, features: Vector) =>
Instance(label, weight, features)
}

val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

val (summarizer, labelSummarizer) = {
Expand Down Expand Up @@ -343,7 +355,21 @@ class LogisticRegression @Since("1.2.0") (
val initialCoefficientsWithIntercept =
Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures)

if ($(fitIntercept)) {
if (optInitialModel.isDefined && optInitialModel.get.coefficients.size != numFeatures) {
val vec = optInitialModel.get.coefficients
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vec is not used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its used on L348 in the log warning

logWarning(
s"Initial coefficients provided ${vec} did not match the expected size ${numFeatures}")
}

if (optInitialModel.isDefined && optInitialModel.get.coefficients.size == numFeatures) {
val initialCoefficientsWithInterceptArray = initialCoefficientsWithIntercept.toArray
optInitialModel.get.coefficients.foreachActive { case (index, value) =>
initialCoefficientsWithInterceptArray(index) = value
}
if ($(fitIntercept)) {
initialCoefficientsWithInterceptArray(numFeatures) == optInitialModel.get.intercept
}
} else if ($(fitIntercept)) {
/*
For binary logistic regression, when we initialize the coefficients as zeros,
it will converge faster if we initialize the intercept such that
Expand Down Expand Up @@ -434,7 +460,7 @@ object LogisticRegression extends DefaultParamsReadable[LogisticRegression] {
*/
@Since("1.4.0")
@Experimental
class LogisticRegressionModel private[ml] (
class LogisticRegressionModel private[spark] (
@Since("1.4.0") override val uid: String,
@Since("1.6.0") val coefficients: Vector,
@Since("1.3.0") val intercept: Double)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ package org.apache.spark.mllib.classification

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.mllib.classification.impl.GLMClassificationModel
import org.apache.spark.mllib.linalg.{DenseVector, Vector}
import org.apache.spark.mllib.linalg.{DenseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.dot
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.pmml.PMMLExportable
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.{DataValidators, Loader, Saveable}
import org.apache.spark.mllib.util.MLUtils.appendBias
import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel

/**
* Classification model trained using Multinomial/Binary Logistic Regression.
Expand Down Expand Up @@ -332,6 +335,13 @@ object LogisticRegressionWithSGD {
* Limited-memory BFGS. Standard feature scaling and L2 regularization are used by default.
* NOTE: Labels used in Logistic Regression should be {0, 1, ..., k - 1}
* for k classes multi-label classification problem.
*
* Earlier implementations of LogisticRegressionWithLBFGS applies a regularization
* penalty to all elements including the intercept. If this is called with one of
* standard updaters (L1Updater, or SquaredL2Updater) this is translated
* into a call to ml.LogisticRegression, otherwise this will use the existing mllib
* GeneralizedLinearAlgorithm trainer, resulting in a regularization penalty to the
* intercept.
*/
@Since("1.1.0")
class LogisticRegressionWithLBFGS
Expand Down Expand Up @@ -374,4 +384,72 @@ class LogisticRegressionWithLBFGS
new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1)
}
}

/**
* Run Logistic Regression with the configured parameters on an input RDD
* of LabeledPoint entries.
*
* If a known updater is used calls the ml implementation, to avoid
* applying a regularization penalty to the intercept, otherwise
* defaults to the mllib implementation. If more than two classes
* or feature scaling is disabled, always uses mllib implementation.
* If using ml implementation, uses ml code to generate initial weights.
*/
override def run(input: RDD[LabeledPoint]): LogisticRegressionModel = {
run(input, generateInitialWeights(input), userSuppliedWeights = false)
}

/**
* Run Logistic Regression with the configured parameters on an input RDD
* of LabeledPoint entries starting from the initial weights provided.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add extra new line before If a known updater is...

*
* If a known updater is used calls the ml implementation, to avoid
* applying a regularization penalty to the intercept, otherwise
* defaults to the mllib implementation. If more than two classes
* or feature scaling is disabled, always uses mllib implementation.
* Uses user provided weights.
*/
override def run(input: RDD[LabeledPoint], initialWeights: Vector): LogisticRegressionModel = {
run(input, initialWeights, userSuppliedWeights = true)
}

private def run(input: RDD[LabeledPoint], initialWeights: Vector, userSuppliedWeights: Boolean):
LogisticRegressionModel = {
// ml's Logisitic regression only supports binary classifcation currently.
if (numOfLinearPredictor == 1) {
def runWithMlLogisitcRegression(elasticNetParam: Double) = {
// Prepare the ml LogisticRegression based on our settings
val lr = new org.apache.spark.ml.classification.LogisticRegression()
lr.setRegParam(optimizer.getRegParam())
lr.setElasticNetParam(elasticNetParam)
lr.setStandardization(useFeatureScaling)
if (userSuppliedWeights) {
val uid = Identifiable.randomUID("logreg-static")
lr.setInitialModel(new org.apache.spark.ml.classification.LogisticRegressionModel(
uid, initialWeights, 1.0))
}
lr.setFitIntercept(addIntercept)
lr.setMaxIter(optimizer.getNumIterations())
lr.setTol(optimizer.getConvergenceTol())
// Convert our input into a DataFrame
val sqlContext = new SQLContext(input.context)
import sqlContext.implicits._
val df = input.toDF()
// Determine if we should cache the DF
val handlePersistence = input.getStorageLevel == StorageLevel.NONE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause double caching? Let's say input RDD is cached, so handlePersistence will be false. As a result, df == StorageLevel.NONE will be true in ml's LOR code, and this will cause caching twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, in a previous version of the code we passed handlePersistence down through to avoid this. I've updated it to do the same here.

// Train our model
val mlLogisticRegresionModel = lr.train(df, handlePersistence)
// convert the model
val weights = Vectors.dense(mlLogisticRegresionModel.coefficients.toArray)
createModel(weights, mlLogisticRegresionModel.intercept)
}
optimizer.getUpdater() match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when optimizer.getRegParam() == 0.0, run the old version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, this will make the test harder to write. I don't care this one now.

case x: SquaredL2Updater => runWithMlLogisitcRegression(1.0)
case x: L1Updater => runWithMlLogisitcRegression(0.0)
case _ => super.run(input, initialWeights)
}
} else {
super.run(input, initialWeights)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
this
}

/*
* Get the convergence tolerance of iterations.
*/
private[mllib] def getConvergenceTol(): Double = {
this.convergenceTol
}

/**
* Set the maximal number of iterations for L-BFGS. Default 100.
* @deprecated use [[LBFGS#setNumIterations]] instead
Expand All @@ -86,6 +93,13 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
this
}

/**
* Get the maximum number of iterations for L-BFGS. Defaults to 100.
*/
private[mllib] def getNumIterations(): Int = {
this.maxNumIterations
}

/**
* Set the regularization parameter. Default 0.0.
*/
Expand All @@ -94,6 +108,13 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
this
}

/**
* Get the regularization parameter.
*/
private[mllib] def getRegParam(): Double = {
this.regParam
}

/**
* Set the gradient function (of the loss function of one single data example)
* to be used for L-BFGS.
Expand All @@ -113,6 +134,13 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
this
}

/**
* Returns the updater, limited to internal use.
*/
private[mllib] def getUpdater(): Updater = {
updater
}

override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = LBFGS.runLBFGS(
data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
* translated back to resulting model weights, so it's transparent to users.
* Note: This technique is used in both libsvm and glmnet packages. Default false.
*/
private var useFeatureScaling = false
private[mllib] var useFeatureScaling = false

/**
* The dimension of training features.
Expand Down Expand Up @@ -196,12 +196,9 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
}

/**
* Run the algorithm with the configured parameters on an input
* RDD of LabeledPoint entries.
*
* Generate the initial weights when the user does not supply them
*/
@Since("0.8.0")
def run(input: RDD[LabeledPoint]): M = {
protected def generateInitialWeights(input: RDD[LabeledPoint]): Vector = {
if (numFeatures < 0) {
numFeatures = input.map(_.features.size).first()
}
Expand All @@ -217,16 +214,23 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
* TODO: See if we can deprecate `intercept` in `GeneralizedLinearModel`, and always
* have the intercept as part of weights to have consistent design.
*/
val initialWeights = {
if (numOfLinearPredictor == 1) {
Vectors.zeros(numFeatures)
} else if (addIntercept) {
Vectors.zeros((numFeatures + 1) * numOfLinearPredictor)
} else {
Vectors.zeros(numFeatures * numOfLinearPredictor)
}
if (numOfLinearPredictor == 1) {
Vectors.zeros(numFeatures)
} else if (addIntercept) {
Vectors.zeros((numFeatures + 1) * numOfLinearPredictor)
} else {
Vectors.zeros(numFeatures * numOfLinearPredictor)
}
run(input, initialWeights)
}

/**
* Run the algorithm with the configured parameters on an input
* RDD of LabeledPoint entries.
*
*/
@Since("0.8.0")
def run(input: RDD[LabeledPoint]): M = {
run(input, generateInitialWeights(input))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private class MockLogisticRegression(uid: String) extends LogisticRegression(uid

setMaxIter(1)

override protected def train(dataset: DataFrame): LogisticRegressionModel = {
override protected[spark] def train(dataset: DataFrame): LogisticRegressionModel = {
val labelSchema = dataset.schema($(labelCol))
// check for label attribute propagation.
assert(MetadataUtils.getNumClasses(labelSchema).forall(_ == 2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.apache.spark.mllib.util.TestingUtils._
Expand Down Expand Up @@ -215,6 +216,11 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w

// Test if we can correctly learn A, B where Y = logistic(A + B*X)
test("logistic regression with LBFGS") {
val updaters: List[Updater] = List(new SquaredL2Updater(), new L1Updater())
updaters.foreach(testLBFGS)
}

private def testLBFGS(myUpdater: Updater): Unit = {
val nPoints = 10000
val A = 2.0
val B = -1.5
Expand All @@ -223,7 +229,15 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w

val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
val lr = new LogisticRegressionWithLBFGS().setIntercept(true)

// Override the updater
class LogisticRegressionWithLBFGSCustomUpdater
extends LogisticRegressionWithLBFGS {
override val optimizer =
new LBFGS(new LogisticGradient, myUpdater)
}

val lr = new LogisticRegressionWithLBFGSCustomUpdater().setIntercept(true)

val model = lr.run(testRDD)

Expand Down Expand Up @@ -396,10 +410,11 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
assert(modelA1.weights(0) ~== modelA3.weights(0) * 1.0E6 absTol 0.01)

// Training data with different scales without feature standardization
// will not yield the same result in the scaled space due to poor
// convergence rate.
assert(modelB1.weights(0) !~== modelB2.weights(0) * 1.0E3 absTol 0.1)
assert(modelB1.weights(0) !~== modelB3.weights(0) * 1.0E6 absTol 0.1)
// should still converge quickly since the model still uses standardization but
// simply modifies the regularization function. See regParamL1Fun and related
// inside of LogisticRegression
assert(modelB1.weights(0) ~== modelB2.weights(0) * 1.0E3 absTol 0.1)
assert(modelB1.weights(0) ~== modelB3.weights(0) * 1.0E6 absTol 0.1)
}

test("multinomial logistic regression with LBFGS") {
Expand Down