From 92d6d8ceb4fdf4bd0c43b3e8b58683f12e3e2fd1 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 26 Aug 2015 19:12:20 +0800 Subject: [PATCH 1/2] SGD algorithms need convergenceTol parameter in Python --- .../mllib/api/python/PythonMLLibAPI.scala | 19 ++++++++--- python/pyspark/mllib/classification.py | 17 +++++++--- python/pyspark/mllib/regression.py | 32 ++++++++++++------- 3 files changed, 47 insertions(+), 21 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f585aacd452e0..35be0b8c99c4c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -132,7 +132,8 @@ private[python] class PythonMLLibAPI extends Serializable { regParam: Double, regType: String, intercept: Boolean, - validateData: Boolean): JList[Object] = { + validateData: Boolean, + convergenceTol: Double): JList[Object] = { val lrAlg = new LinearRegressionWithSGD() lrAlg.setIntercept(intercept) .setValidateData(validateData) @@ -141,6 +142,7 @@ private[python] class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) + .setConvergenceTol(convergenceTol) lrAlg.optimizer.setUpdater(getUpdaterFromString(regType)) trainRegressionModel( lrAlg, @@ -159,7 +161,8 @@ private[python] class PythonMLLibAPI extends Serializable { miniBatchFraction: Double, initialWeights: Vector, intercept: Boolean, - validateData: Boolean): JList[Object] = { + validateData: Boolean, + convergenceTol: Double): JList[Object] = { val lassoAlg = new LassoWithSGD() lassoAlg.setIntercept(intercept) .setValidateData(validateData) @@ -168,6 +171,7 @@ private[python] class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) + .setConvergenceTol(convergenceTol) trainRegressionModel( lassoAlg, data, @@ -185,7 +189,8 @@ private[python] class PythonMLLibAPI extends Serializable { miniBatchFraction: Double, initialWeights: Vector, intercept: Boolean, - validateData: Boolean): JList[Object] = { + validateData: Boolean, + convergenceTol: Double): JList[Object] = { val ridgeAlg = new RidgeRegressionWithSGD() ridgeAlg.setIntercept(intercept) .setValidateData(validateData) @@ -194,6 +199,7 @@ private[python] class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) + .setConvergenceTol(convergenceTol) trainRegressionModel( ridgeAlg, data, @@ -212,7 +218,7 @@ private[python] class PythonMLLibAPI extends Serializable { initialWeights: Vector, regType: String, intercept: Boolean, - validateData: Boolean): JList[Object] = { + validateData: Boolean, convergenceTol: Double): JList[Object] = { val SVMAlg = new SVMWithSGD() SVMAlg.setIntercept(intercept) .setValidateData(validateData) @@ -221,6 +227,7 @@ private[python] class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) + .setConvergenceTol(convergenceTol) SVMAlg.optimizer.setUpdater(getUpdaterFromString(regType)) trainRegressionModel( SVMAlg, @@ -240,7 +247,8 @@ private[python] class PythonMLLibAPI extends Serializable { regParam: Double, regType: String, intercept: Boolean, - validateData: Boolean): JList[Object] = { + validateData: Boolean, + convergenceTol: Double): JList[Object] = { val LogRegAlg = new LogisticRegressionWithSGD() LogRegAlg.setIntercept(intercept) .setValidateData(validateData) @@ -249,6 +257,7 @@ private[python] class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) + .setConvergenceTol(convergenceTol) LogRegAlg.optimizer.setUpdater(getUpdaterFromString(regType)) trainRegressionModel( LogRegAlg, diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 8f27c446a66e8..cb4ee83678081 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -241,7 +241,7 @@ class LogisticRegressionWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None, regParam=0.01, regType="l2", intercept=False, - validateData=True): + validateData=True, convergenceTol=0.001): """ Train a logistic regression model on the given data. @@ -274,11 +274,13 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, :param validateData: Boolean parameter which indicates if the algorithm should validate data before training. (default: True) + :param convergenceTol: A condition which decides iteration termination. + (default: 0.001) """ def train(rdd, i): return callMLlibFunc("trainLogisticRegressionModelWithSGD", rdd, int(iterations), float(step), float(miniBatchFraction), i, float(regParam), regType, - bool(intercept), bool(validateData)) + bool(intercept), bool(validateData), float(convergenceTol)) return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights) @@ -439,7 +441,7 @@ class SVMWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, regParam=0.01, miniBatchFraction=1.0, initialWeights=None, regType="l2", - intercept=False, validateData=True): + intercept=False, validateData=True, convergenceTol=0.001): """ Train a support vector machine on the given data. @@ -472,11 +474,13 @@ def train(cls, data, iterations=100, step=1.0, regParam=0.01, :param validateData: Boolean parameter which indicates if the algorithm should validate data before training. (default: True) + :param convergenceTol: A condition which decides iteration termination. + (default: 0.001) """ def train(rdd, i): return callMLlibFunc("trainSVMModelWithSGD", rdd, int(iterations), float(step), float(regParam), float(miniBatchFraction), i, regType, - bool(intercept), bool(validateData)) + bool(intercept), bool(validateData), float(convergenceTol)) return _regression_train_wrapper(train, SVMModel, data, initialWeights) @@ -600,12 +604,15 @@ class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm): :param miniBatchFraction: Fraction of data on which SGD is run for each iteration. :param regParam: L2 Regularization parameter. + :param convergenceTol: A condition which decides iteration termination. """ - def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01): + def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01, + convergenceTol=0.001): self.stepSize = stepSize self.numIterations = numIterations self.regParam = regParam self.miniBatchFraction = miniBatchFraction + self.convergenceTol = convergenceTol self._model = None super(StreamingLogisticRegressionWithSGD, self).__init__( model=self._model) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 41946e3674fbe..256b7537fef6b 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -28,7 +28,8 @@ 'LinearRegressionModel', 'LinearRegressionWithSGD', 'RidgeRegressionModel', 'RidgeRegressionWithSGD', 'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel', - 'IsotonicRegression'] + 'IsotonicRegression', 'StreamingLinearAlgorithm', + 'StreamingLinearRegressionWithSGD'] class LabeledPoint(object): @@ -202,7 +203,7 @@ class LinearRegressionWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None, regParam=0.0, regType=None, intercept=False, - validateData=True): + validateData=True, convergenceTol=0.001): """ Train a linear regression model using Stochastic Gradient Descent (SGD). @@ -244,11 +245,14 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, :param validateData: Boolean parameter which indicates if the algorithm should validate data before training. (default: True) + :param convergenceTol: A condition which decides iteration termination. + (default: 0.001) """ def train(rdd, i): return callMLlibFunc("trainLinearRegressionModelWithSGD", rdd, int(iterations), float(step), float(miniBatchFraction), i, float(regParam), - regType, bool(intercept), bool(validateData)) + regType, bool(intercept), bool(validateData), + float(convergenceTol)) return _regression_train_wrapper(train, LinearRegressionModel, data, initialWeights) @@ -330,7 +334,7 @@ class LassoWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, regParam=0.01, miniBatchFraction=1.0, initialWeights=None, intercept=False, - validateData=True): + validateData=True, convergenceTol=0.001): """ Train a regression model with L1-regularization using Stochastic Gradient Descent. @@ -362,11 +366,13 @@ def train(cls, data, iterations=100, step=1.0, regParam=0.01, :param validateData: Boolean parameter which indicates if the algorithm should validate data before training. (default: True) + :param convergenceTol: A condition which decides iteration termination. + (default: 0.001) """ def train(rdd, i): return callMLlibFunc("trainLassoModelWithSGD", rdd, int(iterations), float(step), float(regParam), float(miniBatchFraction), i, bool(intercept), - bool(validateData)) + bool(validateData), float(convergenceTol)) return _regression_train_wrapper(train, LassoModel, data, initialWeights) @@ -449,7 +455,7 @@ class RidgeRegressionWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, regParam=0.01, miniBatchFraction=1.0, initialWeights=None, intercept=False, - validateData=True): + validateData=True, convergenceTol=0.001): """ Train a regression model with L2-regularization using Stochastic Gradient Descent. @@ -481,11 +487,13 @@ def train(cls, data, iterations=100, step=1.0, regParam=0.01, :param validateData: Boolean parameter which indicates if the algorithm should validate data before training. (default: True) + :param convergenceTol: A condition which decides iteration termination. + (default: 0.001) """ def train(rdd, i): return callMLlibFunc("trainRidgeModelWithSGD", rdd, int(iterations), float(step), float(regParam), float(miniBatchFraction), i, bool(intercept), - bool(validateData)) + bool(validateData), float(convergenceTol)) return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights) @@ -636,15 +644,17 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm): After training on a batch of data, the weights obtained at the end of training are used as initial weights for the next batch. - :param: stepSize Step size for each iteration of gradient descent. - :param: numIterations Total number of iterations run. - :param: miniBatchFraction Fraction of data on which SGD is run for each + :param stepSize: Step size for each iteration of gradient descent. + :param numIterations: Total number of iterations run. + :param miniBatchFraction: Fraction of data on which SGD is run for each iteration. + :param convergenceTol: A condition which decides iteration termination. """ - def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0): + def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, convergenceTol=0.001): self.stepSize = stepSize self.numIterations = numIterations self.miniBatchFraction = miniBatchFraction + self.convergenceTol = convergenceTol self._model = None super(StreamingLinearRegressionWithSGD, self).__init__( model=self._model) From c2f3112c597f148b481e52bdd7bcef14a0bb3a7b Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Fri, 11 Sep 2015 15:41:54 +0800 Subject: [PATCH 2/2] fix typo --- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 35be0b8c99c4c..69ce7f50709a1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -218,7 +218,8 @@ private[python] class PythonMLLibAPI extends Serializable { initialWeights: Vector, regType: String, intercept: Boolean, - validateData: Boolean, convergenceTol: Double): JList[Object] = { + validateData: Boolean, + convergenceTol: Double): JList[Object] = { val SVMAlg = new SVMWithSGD() SVMAlg.setIntercept(intercept) .setValidateData(validateData)