From a72fdc9ce4bfc5f075739140237f10c7150311f8 Mon Sep 17 00:00:00 2001 From: Michelangelo D'Agostino Date: Tue, 4 Nov 2014 11:41:57 -0600 Subject: [PATCH 1/6] Expose nonnegative ALS in the python API. --- .../mllib/api/python/PythonMLLibAPI.scala | 11 ++-- .../spark/mllib/recommendation/ALS.scala | 54 +++++++++++++++++++ python/pyspark/mllib/recommendation.py | 17 ++++-- 3 files changed, 74 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 65b98a8ceea55..50ad16bce0756 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -278,8 +278,10 @@ class PythonMLLibAPI extends Serializable { rank: Int, iterations: Int, lambda: Double, - blocks: Int): MatrixFactorizationModel = { - new MatrixFactorizationModelWrapper(ALS.train(ratings.rdd, rank, iterations, lambda, blocks)) + blocks: Int, + nonnegative: Boolean): MatrixFactorizationModel = { + new MatrixFactorizationModelWrapper( + ALS.train(ratings.rdd, rank, iterations, lambda, blocks, nonnegative)) } /** @@ -294,9 +296,10 @@ class PythonMLLibAPI extends Serializable { iterations: Int, lambda: Double, blocks: Int, - alpha: Double): MatrixFactorizationModel = { + alpha: Double, + nonnegative: Boolean): MatrixFactorizationModel = { new MatrixFactorizationModelWrapper( - ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha)) + ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha, nonnegative)) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 84d192db53e26..271bc454710e5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -696,6 +696,32 @@ object ALS { new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0).run(ratings) } + /** + * Train a matrix factorization model given an RDD of ratings given by users to some products, + * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the + * product of two lower-rank matrices of a given rank (number of features). To solve for these + * features, we run a given number of iterations of ALS. This is done using a level of + * parallelism given by `blocks`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param nonnegative whether to enforce nonnegativity + */ + def train( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + nonnegative: Boolean + ): MatrixFactorizationModel = { + (new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0) + .setNonnegative(nonnegative).run(ratings)) + } + /** * Train a matrix factorization model given an RDD of ratings given by users to some products, * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the @@ -781,6 +807,34 @@ object ALS { new ALS(blocks, blocks, rank, iterations, lambda, true, alpha).run(ratings) } + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' given by users + * to some products, in the form of (userID, productID, preference) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. This is done using + * a level of parallelism given by `blocks`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param alpha confidence parameter (only applies when immplicitPrefs = true) + * @param nonnegative whether to enforce nonnegativity + */ + def trainImplicit( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + alpha: Double, + nonnegative: Boolean + ): MatrixFactorizationModel = { + (new ALS(blocks, blocks, rank, iterations, lambda, true, alpha) + .setNonnegative(nonnegative).run(ratings)) + } + /** * Train a matrix factorization model given an RDD of 'implicit preferences' given by users to * some products, in the form of (userID, productID, preference) pairs. We approximate the diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index e8b998414d319..e65d684959576 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -69,6 +69,14 @@ class MatrixFactorizationModel(JavaModelWrapper): >>> latents = first_product[1] >>> len(latents) == 4 True + + >>> model = ALS.train(ratings, 1, nonnegative=True) + >>> model.predict(2,2) is not None + True + + >>> model = ALS.trainImplicit(ratings, 1, nonnegative=True) + >>> model.predict(2,2) is not None + True """ def predict(self, user, product): return self._java_model.predict(user, product) @@ -101,15 +109,16 @@ def _prepare(cls, ratings): return _to_java_object_rdd(ratings, True) @classmethod - def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): + def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False): model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations, - lambda_, blocks) + lambda_, blocks, nonnegative) return MatrixFactorizationModel(model) @classmethod - def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): + def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01, + nonnegative=False): model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, - iterations, lambda_, blocks, alpha) + iterations, lambda_, blocks, alpha, nonnegative) return MatrixFactorizationModel(model) From cedf043abe63b0b91d0c44d8dbf072dc922e96c1 Mon Sep 17 00:00:00 2001 From: Michelangelo D'Agostino Date: Wed, 5 Nov 2014 13:37:56 -0600 Subject: [PATCH 2/6] Added in ability to set the seed from python and made that play nice with the nonnegative changes. Also made the python ALS tests more exact. --- .../mllib/api/python/PythonMLLibAPI.scala | 41 ++++++++++++- .../spark/mllib/recommendation/ALS.scala | 58 +++++++++++++++++++ python/pyspark/mllib/recommendation.py | 53 +++++++++++------ 3 files changed, 133 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 50ad16bce0756..3b4a47b56b25f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -273,6 +273,24 @@ class PythonMLLibAPI extends Serializable { * needs to be taken in the Python code to ensure it gets freed on exit; see * the Py4J documentation. */ + def trainALSModel( + ratings: JavaRDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + seed: Long, + nonnegative: Boolean): MatrixFactorizationModel = { + new MatrixFactorizationModelWrapper( + ALS.train(ratings.rdd, rank, iterations, lambda, blocks, seed, nonnegative)) + } + + /** + * Java stub for Python mllib ALS.train(). This stub returns a handle + * to the Java object instead of the content of the Java object. Extra care + * needs to be taken in the Python code to ensure it gets freed on exit; see + * the Py4J documentation. This version does not specify a seed. + */ def trainALSModel( ratings: JavaRDD[Rating], rank: Int, @@ -290,6 +308,26 @@ class PythonMLLibAPI extends Serializable { * Extra care needs to be taken in the Python code to ensure it gets freed on * exit; see the Py4J documentation. */ + def trainImplicitALSModel( + ratingsJRDD: JavaRDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + alpha: Double, + seed: Long, + nonnegative: Boolean): MatrixFactorizationModel = { + new MatrixFactorizationModelWrapper( + ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha, + seed, nonnegative)) + } + + /** + * Java stub for Python mllib ALS.trainImplicit(). This stub returns a + * handle to the Java object instead of the content of the Java object. + * Extra care needs to be taken in the Python code to ensure it gets freed on + * exit; see the Py4J documentation. This version does not specify a seed. + */ def trainImplicitALSModel( ratingsJRDD: JavaRDD[Rating], rank: Int, @@ -299,7 +337,8 @@ class PythonMLLibAPI extends Serializable { alpha: Double, nonnegative: Boolean): MatrixFactorizationModel = { new MatrixFactorizationModelWrapper( - ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha, nonnegative)) + ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha, + nonnegative)) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 271bc454710e5..6b36cad21142e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -673,6 +673,34 @@ object ALS { new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings) } + /** + * Train a matrix factorization model given an RDD of ratings given by users to some products, + * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the + * product of two lower-rank matrices of a given rank (number of features). To solve for these + * features, we run a given number of iterations of ALS. This is done using a level of + * parallelism given by `blocks`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param seed random seed + * @param nonnegative whether to enforce nonnegativity + */ + def train( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + seed: Long, + nonnegative: Boolean + ): MatrixFactorizationModel = { + (new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0, seed) + .setNonnegative(nonnegative).run(ratings)) + } + /** * Train a matrix factorization model given an RDD of ratings given by users to some products, * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the @@ -782,6 +810,36 @@ object ALS { new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings) } + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' given by users + * to some products, in the form of (userID, productID, preference) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. This is done using + * a level of parallelism given by `blocks`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param alpha confidence parameter (only applies when immplicitPrefs = true) + * @param seed random seed + * @param nonnegative whether to enforce nonnegativity + */ + def trainImplicit( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + alpha: Double, + seed: Long, + nonnegative: Boolean + ): MatrixFactorizationModel = { + (new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed) + .setNonnegative(nonnegative).run(ratings)) + } + /** * Train a matrix factorization model given an RDD of 'implicit preferences' given by users * to some products, in the form of (userID, productID, preference) pairs. We approximate the diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index e65d684959576..0e19ebebf5947 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -45,25 +45,25 @@ class MatrixFactorizationModel(JavaModelWrapper): >>> r3 = (2, 1, 2.0) >>> ratings = sc.parallelize([r1, r2, r3]) >>> model = ALS.trainImplicit(ratings, 1) - >>> model.predict(2,2) is not None - True + >>> model.predict(2,2) + 0.4473... >>> testset = sc.parallelize([(1, 2), (1, 1)]) >>> model = ALS.train(ratings, 1) - >>> model.predictAll(testset).count() == 2 - True + >>> model.predictAll(testset).collect() + [Rating(1, 1, 1), Rating(1, 2, 1)] >>> model = ALS.train(ratings, 4) - >>> model.userFeatures().count() == 2 - True + >>> model.userFeatures().collect() + [(2, array('d', [...])), (1, array('d', [...]))] >>> first_user = model.userFeatures().take(1)[0] >>> latents = first_user[1] >>> len(latents) == 4 True - >>> model.productFeatures().count() == 2 - True + >>> model.productFeatures().collect() + [(2, array('d', [...])), (1, array('d', [...]))] >>> first_product = model.productFeatures().take(1)[0] >>> latents = first_product[1] @@ -71,12 +71,20 @@ class MatrixFactorizationModel(JavaModelWrapper): True >>> model = ALS.train(ratings, 1, nonnegative=True) - >>> model.predict(2,2) is not None - True + >>> model.predict(2,2) + 3.735... >>> model = ALS.trainImplicit(ratings, 1, nonnegative=True) - >>> model.predict(2,2) is not None - True + >>> model.predict(2,2) + 0.4473... + + >>> model = ALS.train(ratings, 1, seed=10, nonnegative=True) + >>> model.predict(2,2) + 3.735... + + >>> model = ALS.trainImplicit(ratings, 1, seed=10, nonnegative=True) + >>> model.predict(2,2) + 0.4473... """ def predict(self, user, product): return self._java_model.predict(user, product) @@ -109,16 +117,25 @@ def _prepare(cls, ratings): return _to_java_object_rdd(ratings, True) @classmethod - def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False): - model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations, - lambda_, blocks, nonnegative) + def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, seed=None, + nonnegative=False): + if seed: + model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations, + lambda_, blocks, seed, nonnegative) + else: + model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations, + lambda_, blocks, nonnegative) return MatrixFactorizationModel(model) @classmethod def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01, - nonnegative=False): - model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, - iterations, lambda_, blocks, alpha, nonnegative) + seed=None, nonnegative=False): + if seed: + model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, + iterations, lambda_, blocks, alpha, seed, nonnegative) + else: + model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, + iterations, lambda_, blocks, alpha, nonnegative) return MatrixFactorizationModel(model) From bdcc1545c08632d47d2f59da72daa85e21d37a6f Mon Sep 17 00:00:00 2001 From: Michelangelo D'Agostino Date: Thu, 6 Nov 2014 10:42:51 -0600 Subject: [PATCH 3/6] Change seed type to java.lang.Long so that it can handle null. --- .../mllib/api/python/PythonMLLibAPI.scala | 65 +++++++------------ python/pyspark/mllib/recommendation.py | 16 ++--- 2 files changed, 26 insertions(+), 55 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 3b4a47b56b25f..8e33192e8c134 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -279,27 +279,17 @@ class PythonMLLibAPI extends Serializable { iterations: Int, lambda: Double, blocks: Int, - seed: Long, + seed: java.lang.Long, nonnegative: Boolean): MatrixFactorizationModel = { - new MatrixFactorizationModelWrapper( - ALS.train(ratings.rdd, rank, iterations, lambda, blocks, seed, nonnegative)) - } - - /** - * Java stub for Python mllib ALS.train(). This stub returns a handle - * to the Java object instead of the content of the Java object. Extra care - * needs to be taken in the Python code to ensure it gets freed on exit; see - * the Py4J documentation. This version does not specify a seed. - */ - def trainALSModel( - ratings: JavaRDD[Rating], - rank: Int, - iterations: Int, - lambda: Double, - blocks: Int, - nonnegative: Boolean): MatrixFactorizationModel = { - new MatrixFactorizationModelWrapper( - ALS.train(ratings.rdd, rank, iterations, lambda, blocks, nonnegative)) + if (seed == null) { + new MatrixFactorizationModelWrapper( + // if the seed coming from python is None/null, let ALS use the + // default, which is to use System.nanoTime + ALS.train(ratings.rdd, rank, iterations, lambda, blocks, nonnegative)) + } else { + new MatrixFactorizationModelWrapper( + ALS.train(ratings.rdd, rank, iterations, lambda, blocks, seed, nonnegative)) + } } /** @@ -315,30 +305,19 @@ class PythonMLLibAPI extends Serializable { lambda: Double, blocks: Int, alpha: Double, - seed: Long, + seed: java.lang.Long, nonnegative: Boolean): MatrixFactorizationModel = { - new MatrixFactorizationModelWrapper( - ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha, - seed, nonnegative)) - } - - /** - * Java stub for Python mllib ALS.trainImplicit(). This stub returns a - * handle to the Java object instead of the content of the Java object. - * Extra care needs to be taken in the Python code to ensure it gets freed on - * exit; see the Py4J documentation. This version does not specify a seed. - */ - def trainImplicitALSModel( - ratingsJRDD: JavaRDD[Rating], - rank: Int, - iterations: Int, - lambda: Double, - blocks: Int, - alpha: Double, - nonnegative: Boolean): MatrixFactorizationModel = { - new MatrixFactorizationModelWrapper( - ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha, - nonnegative)) + if (seed == null) { + // if the seed coming from python is None/null, let ALS use the + // default, which is to use System.nanoTime + new MatrixFactorizationModelWrapper( + ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha, + nonnegative)) + } else { + new MatrixFactorizationModelWrapper( + ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha, + seed, nonnegative)) + } } /** diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 0e19ebebf5947..2200a65d3b37c 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -119,23 +119,15 @@ def _prepare(cls, ratings): @classmethod def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, seed=None, nonnegative=False): - if seed: - model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations, - lambda_, blocks, seed, nonnegative) - else: - model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations, - lambda_, blocks, nonnegative) + model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations, + lambda_, blocks, seed, nonnegative) return MatrixFactorizationModel(model) @classmethod def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01, seed=None, nonnegative=False): - if seed: - model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, - iterations, lambda_, blocks, alpha, seed, nonnegative) - else: - model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, - iterations, lambda_, blocks, alpha, nonnegative) + model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, + iterations, lambda_, blocks, alpha, seed, nonnegative) return MatrixFactorizationModel(model) From 3fdc85175ecf14bb958947ee7b93f7b79f8946c9 Mon Sep 17 00:00:00 2001 From: Michelangelo D'Agostino Date: Thu, 6 Nov 2014 16:42:38 -0600 Subject: [PATCH 4/6] Moved seed to the end of the python parameter list. --- python/pyspark/mllib/recommendation.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 2200a65d3b37c..b282b472cfb92 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -78,11 +78,11 @@ class MatrixFactorizationModel(JavaModelWrapper): >>> model.predict(2,2) 0.4473... - >>> model = ALS.train(ratings, 1, seed=10, nonnegative=True) + >>> model = ALS.train(ratings, 1, nonnegative=True, seed=10) >>> model.predict(2,2) 3.735... - >>> model = ALS.trainImplicit(ratings, 1, seed=10, nonnegative=True) + >>> model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=10) >>> model.predict(2,2) 0.4473... """ @@ -117,15 +117,15 @@ def _prepare(cls, ratings): return _to_java_object_rdd(ratings, True) @classmethod - def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, seed=None, - nonnegative=False): + def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False, + seed=None): model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations, lambda_, blocks, seed, nonnegative) return MatrixFactorizationModel(model) @classmethod def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01, - seed=None, nonnegative=False): + nonnegative=False, seed=None): model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, iterations, lambda_, blocks, alpha, seed, nonnegative) return MatrixFactorizationModel(model) From 7cffd39efc5af77227082d8c676b0b6eeba1f0f7 Mon Sep 17 00:00:00 2001 From: Michelangelo D'Agostino Date: Thu, 6 Nov 2014 18:48:41 -0600 Subject: [PATCH 5/6] Swapped nonnegative and seed in a few more places. --- .../apache/spark/mllib/api/python/PythonMLLibAPI.scala | 8 ++++---- python/pyspark/mllib/recommendation.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 8e33192e8c134..d90b7c2c2b26a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -279,8 +279,8 @@ class PythonMLLibAPI extends Serializable { iterations: Int, lambda: Double, blocks: Int, - seed: java.lang.Long, - nonnegative: Boolean): MatrixFactorizationModel = { + nonnegative: Boolean, + seed: java.lang.Long): MatrixFactorizationModel = { if (seed == null) { new MatrixFactorizationModelWrapper( // if the seed coming from python is None/null, let ALS use the @@ -305,8 +305,8 @@ class PythonMLLibAPI extends Serializable { lambda: Double, blocks: Int, alpha: Double, - seed: java.lang.Long, - nonnegative: Boolean): MatrixFactorizationModel = { + nonnegative: Boolean, + seed: java.lang.Long): MatrixFactorizationModel = { if (seed == null) { // if the seed coming from python is None/null, let ALS use the // default, which is to use System.nanoTime diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index b282b472cfb92..fea156c9a4fb4 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -120,14 +120,14 @@ def _prepare(cls, ratings): def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False, seed=None): model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations, - lambda_, blocks, seed, nonnegative) + lambda_, blocks, nonnegative, seed) return MatrixFactorizationModel(model) @classmethod def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01, nonnegative=False, seed=None): model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, - iterations, lambda_, blocks, alpha, seed, nonnegative) + iterations, lambda_, blocks, alpha, nonnegative, seed) return MatrixFactorizationModel(model) From a6743ad3a3d5254a2438bbf4d253bf2be9ff1822 Mon Sep 17 00:00:00 2001 From: Michelangelo D'Agostino Date: Fri, 7 Nov 2014 09:50:31 -0600 Subject: [PATCH 6/6] Use setters instead of static methods in PythonMLLibAPI. Remove the new static methods I added. Set seed in tests. Change ratings to ratingsRDD in both train and trainImplicit for consistency. --- .../mllib/api/python/PythonMLLibAPI.scala | 48 ++++---- .../spark/mllib/recommendation/ALS.scala | 112 ------------------ python/pyspark/mllib/recommendation.py | 14 +-- 3 files changed, 30 insertions(+), 144 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index d90b7c2c2b26a..72536012e7ba2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -274,22 +274,25 @@ class PythonMLLibAPI extends Serializable { * the Py4J documentation. */ def trainALSModel( - ratings: JavaRDD[Rating], + ratingsJRDD: JavaRDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int, nonnegative: Boolean, seed: java.lang.Long): MatrixFactorizationModel = { - if (seed == null) { - new MatrixFactorizationModelWrapper( - // if the seed coming from python is None/null, let ALS use the - // default, which is to use System.nanoTime - ALS.train(ratings.rdd, rank, iterations, lambda, blocks, nonnegative)) - } else { - new MatrixFactorizationModelWrapper( - ALS.train(ratings.rdd, rank, iterations, lambda, blocks, seed, nonnegative)) - } + + val als = new ALS() + .setRank(rank) + .setIterations(iterations) + .setLambda(lambda) + .setBlocks(blocks) + .setNonnegative(nonnegative) + + if (seed != null) als.setSeed(seed) + + val model = als.run(ratingsJRDD.rdd) + new MatrixFactorizationModelWrapper(model) } /** @@ -307,17 +310,20 @@ class PythonMLLibAPI extends Serializable { alpha: Double, nonnegative: Boolean, seed: java.lang.Long): MatrixFactorizationModel = { - if (seed == null) { - // if the seed coming from python is None/null, let ALS use the - // default, which is to use System.nanoTime - new MatrixFactorizationModelWrapper( - ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha, - nonnegative)) - } else { - new MatrixFactorizationModelWrapper( - ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha, - seed, nonnegative)) - } + + val als = new ALS() + .setImplicitPrefs(true) + .setRank(rank) + .setIterations(iterations) + .setLambda(lambda) + .setBlocks(blocks) + .setAlpha(alpha) + .setNonnegative(nonnegative) + + if (seed != null) als.setSeed(seed) + + val model = als.run(ratingsJRDD.rdd) + new MatrixFactorizationModelWrapper(model) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 6b36cad21142e..84d192db53e26 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -673,34 +673,6 @@ object ALS { new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings) } - /** - * Train a matrix factorization model given an RDD of ratings given by users to some products, - * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the - * product of two lower-rank matrices of a given rank (number of features). To solve for these - * features, we run a given number of iterations of ALS. This is done using a level of - * parallelism given by `blocks`. - * - * @param ratings RDD of (userID, productID, rating) pairs - * @param rank number of features to use - * @param iterations number of iterations of ALS (recommended: 10-20) - * @param lambda regularization factor (recommended: 0.01) - * @param blocks level of parallelism to split computation into - * @param seed random seed - * @param nonnegative whether to enforce nonnegativity - */ - def train( - ratings: RDD[Rating], - rank: Int, - iterations: Int, - lambda: Double, - blocks: Int, - seed: Long, - nonnegative: Boolean - ): MatrixFactorizationModel = { - (new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0, seed) - .setNonnegative(nonnegative).run(ratings)) - } - /** * Train a matrix factorization model given an RDD of ratings given by users to some products, * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the @@ -724,32 +696,6 @@ object ALS { new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0).run(ratings) } - /** - * Train a matrix factorization model given an RDD of ratings given by users to some products, - * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the - * product of two lower-rank matrices of a given rank (number of features). To solve for these - * features, we run a given number of iterations of ALS. This is done using a level of - * parallelism given by `blocks`. - * - * @param ratings RDD of (userID, productID, rating) pairs - * @param rank number of features to use - * @param iterations number of iterations of ALS (recommended: 10-20) - * @param lambda regularization factor (recommended: 0.01) - * @param blocks level of parallelism to split computation into - * @param nonnegative whether to enforce nonnegativity - */ - def train( - ratings: RDD[Rating], - rank: Int, - iterations: Int, - lambda: Double, - blocks: Int, - nonnegative: Boolean - ): MatrixFactorizationModel = { - (new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0) - .setNonnegative(nonnegative).run(ratings)) - } - /** * Train a matrix factorization model given an RDD of ratings given by users to some products, * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the @@ -810,36 +756,6 @@ object ALS { new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings) } - /** - * Train a matrix factorization model given an RDD of 'implicit preferences' given by users - * to some products, in the form of (userID, productID, preference) pairs. We approximate the - * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). - * To solve for these features, we run a given number of iterations of ALS. This is done using - * a level of parallelism given by `blocks`. - * - * @param ratings RDD of (userID, productID, rating) pairs - * @param rank number of features to use - * @param iterations number of iterations of ALS (recommended: 10-20) - * @param lambda regularization factor (recommended: 0.01) - * @param blocks level of parallelism to split computation into - * @param alpha confidence parameter (only applies when immplicitPrefs = true) - * @param seed random seed - * @param nonnegative whether to enforce nonnegativity - */ - def trainImplicit( - ratings: RDD[Rating], - rank: Int, - iterations: Int, - lambda: Double, - blocks: Int, - alpha: Double, - seed: Long, - nonnegative: Boolean - ): MatrixFactorizationModel = { - (new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed) - .setNonnegative(nonnegative).run(ratings)) - } - /** * Train a matrix factorization model given an RDD of 'implicit preferences' given by users * to some products, in the form of (userID, productID, preference) pairs. We approximate the @@ -865,34 +781,6 @@ object ALS { new ALS(blocks, blocks, rank, iterations, lambda, true, alpha).run(ratings) } - /** - * Train a matrix factorization model given an RDD of 'implicit preferences' given by users - * to some products, in the form of (userID, productID, preference) pairs. We approximate the - * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). - * To solve for these features, we run a given number of iterations of ALS. This is done using - * a level of parallelism given by `blocks`. - * - * @param ratings RDD of (userID, productID, rating) pairs - * @param rank number of features to use - * @param iterations number of iterations of ALS (recommended: 10-20) - * @param lambda regularization factor (recommended: 0.01) - * @param blocks level of parallelism to split computation into - * @param alpha confidence parameter (only applies when immplicitPrefs = true) - * @param nonnegative whether to enforce nonnegativity - */ - def trainImplicit( - ratings: RDD[Rating], - rank: Int, - iterations: Int, - lambda: Double, - blocks: Int, - alpha: Double, - nonnegative: Boolean - ): MatrixFactorizationModel = { - (new ALS(blocks, blocks, rank, iterations, lambda, true, alpha) - .setNonnegative(nonnegative).run(ratings)) - } - /** * Train a matrix factorization model given an RDD of 'implicit preferences' given by users to * some products, in the form of (userID, productID, preference) pairs. We approximate the diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index fea156c9a4fb4..e26b152e0cdfd 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -44,16 +44,16 @@ class MatrixFactorizationModel(JavaModelWrapper): >>> r2 = (1, 2, 2.0) >>> r3 = (2, 1, 2.0) >>> ratings = sc.parallelize([r1, r2, r3]) - >>> model = ALS.trainImplicit(ratings, 1) + >>> model = ALS.trainImplicit(ratings, 1, seed=10) >>> model.predict(2,2) 0.4473... >>> testset = sc.parallelize([(1, 2), (1, 1)]) - >>> model = ALS.train(ratings, 1) + >>> model = ALS.train(ratings, 1, seed=10) >>> model.predictAll(testset).collect() [Rating(1, 1, 1), Rating(1, 2, 1)] - >>> model = ALS.train(ratings, 4) + >>> model = ALS.train(ratings, 4, seed=10) >>> model.userFeatures().collect() [(2, array('d', [...])), (1, array('d', [...]))] @@ -70,14 +70,6 @@ class MatrixFactorizationModel(JavaModelWrapper): >>> len(latents) == 4 True - >>> model = ALS.train(ratings, 1, nonnegative=True) - >>> model.predict(2,2) - 3.735... - - >>> model = ALS.trainImplicit(ratings, 1, nonnegative=True) - >>> model.predict(2,2) - 0.4473... - >>> model = ALS.train(ratings, 1, nonnegative=True, seed=10) >>> model.predict(2,2) 3.735...