From 7bc35f9ca6926e968ea9e497f54806eaef4116b8 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 31 Jul 2014 11:31:23 +0100 Subject: [PATCH 1/3] Add recommend methods to MatrixFactorizationModel --- .../MatrixFactorizationModel.scala | 38 +++++++++++- .../mllib/recommendation/JavaALSSuite.java | 58 +++++++++++++++---- 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 899286d235a9d..3ac2c3addf737 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -65,6 +65,42 @@ class MatrixFactorizationModel private[mllib] ( } } + /** + * Recommends products to users. + * + * @param user the user to recommend products to + * @param howMany how many products to return. The number returned may be less than this. + * @return product ID and score tuples, sorted descending by score. The first product returned + * is the one predicted to be most strongly recommended to the user. The score is an opaque + * value that indicates how strongly recommended the product is. + */ + def recommendProducts(user: Int, howMany: Int = 10): Array[(Int,Double)] = + recommend(userFeatures.lookup(user).head, productFeatures, howMany) + + /** + * Recommends users to products. That is, this returns users who are most likely to be + * interested in a product. + * + * @param product the product to recommend users to + * @param howMany how many users to return. The number returned may be less than this. + * @return user ID and score tuples, sorted descending by score. The first user returned + * is the one predicted to be most strongly interested in the product. The score is an opaque + * value that indicates how strongly interested the user is. + */ + def recommendUsers(product: Int, howMany: Int = 10): Array[(Int,Double)] = + recommend(productFeatures.lookup(product).head, userFeatures, howMany) + + private def recommend( + recommendToFeatures: Array[Double], + recommendableFeatures: RDD[(Int,Array[Double])], + howMany: Int): Array[(Int,Double)] = { + val recommendToVector = new DoubleMatrix(recommendToFeatures) + val scored = recommendableFeatures.map { case (id,features) => + (id, recommendToVector.dot(new DoubleMatrix(features))) + } + scored.top(howMany)(Ordering.by(_._2)) + } + /** * :: DeveloperApi :: * Predict the rating of many users for many products. @@ -80,6 +116,4 @@ class MatrixFactorizationModel private[mllib] ( predict(usersProducts).map(rate => pythonAPI.serializeRating(rate)) } - // TODO: Figure out what other good bulk prediction methods would look like. - // Probably want a way to get the top users for a product or vice-versa. } diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index bf2365f82044c..9ebeda0574531 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -29,6 +29,8 @@ import org.apache.spark.api.java.JavaSparkContext; import org.jblas.DoubleMatrix; +import scala.Tuple2; +import scala.Tuple3; public class JavaALSSuite implements Serializable { private transient JavaSparkContext sc; @@ -44,21 +46,27 @@ public void tearDown() { sc = null; } - static void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, - DoubleMatrix trueRatings, double matchThreshold, boolean implicitPrefs, DoubleMatrix truePrefs) { + static void validatePrediction(MatrixFactorizationModel model, + int users, + int products, + int features, + DoubleMatrix trueRatings, + double matchThreshold, + boolean implicitPrefs, + DoubleMatrix truePrefs) { DoubleMatrix predictedU = new DoubleMatrix(users, features); - List> userFeatures = model.userFeatures().toJavaRDD().collect(); + List> userFeatures = model.userFeatures().toJavaRDD().collect(); for (int i = 0; i < features; ++i) { - for (scala.Tuple2 userFeature : userFeatures) { + for (Tuple2 userFeature : userFeatures) { predictedU.put((Integer)userFeature._1(), i, userFeature._2()[i]); } } DoubleMatrix predictedP = new DoubleMatrix(products, features); - List> productFeatures = + List> productFeatures = model.productFeatures().toJavaRDD().collect(); for (int i = 0; i < features; ++i) { - for (scala.Tuple2 productFeature : productFeatures) { + for (Tuple2 productFeature : productFeatures) { predictedP.put((Integer)productFeature._1(), i, productFeature._2()[i]); } } @@ -75,7 +83,8 @@ static void validatePrediction(MatrixFactorizationModel model, int users, int pr } } } else { - // For implicit prefs we use the confidence-weighted RMSE to test (ref Mahout's implicit ALS tests) + // For implicit prefs we use the confidence-weighted RMSE to test + // (ref Mahout's implicit ALS tests) double sqErr = 0.0; double denom = 0.0; for (int u = 0; u < users; ++u) { @@ -100,7 +109,7 @@ public void runALSUsingStaticMethods() { int iterations = 15; int users = 50; int products = 100; - scala.Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( users, products, features, 0.7, false, false); JavaRDD data = sc.parallelize(testData._1()); @@ -114,7 +123,7 @@ public void runALSUsingConstructor() { int iterations = 15; int users = 100; int products = 200; - scala.Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( users, products, features, 0.7, false, false); JavaRDD data = sc.parallelize(testData._1()); @@ -131,7 +140,7 @@ public void runImplicitALSUsingStaticMethods() { int iterations = 15; int users = 80; int products = 160; - scala.Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( users, products, features, 0.7, true, false); JavaRDD data = sc.parallelize(testData._1()); @@ -145,7 +154,7 @@ public void runImplicitALSUsingConstructor() { int iterations = 15; int users = 100; int products = 200; - scala.Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( users, products, features, 0.7, true, false); JavaRDD data = sc.parallelize(testData._1()); @@ -163,7 +172,7 @@ public void runImplicitALSWithNegativeWeight() { int iterations = 15; int users = 80; int products = 160; - scala.Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( users, products, features, 0.7, true, true); JavaRDD data = sc.parallelize(testData._1()); @@ -171,4 +180,29 @@ public void runImplicitALSWithNegativeWeight() { validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3()); } + @Test + public void runRecommend() { + int features = 5; + int iterations = 10; + int users = 200; + int products = 50; + Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7, true, false); + JavaRDD data = sc.parallelize(testData._1()); + MatrixFactorizationModel model = ALS.trainImplicit(data.rdd(), features, iterations); + validateRecommendations(model.recommendProducts(1, 10), 10); + validateRecommendations(model.recommendUsers(1, 20), 20); + } + + private static void validateRecommendations(Tuple2[] recommendations, int howMany) { + @SuppressWarnings("unchecked") + Tuple2[] javaRecs = (Tuple2[]) (Object[]) recommendations; + Assert.assertEquals(howMany, javaRecs.length); + for (int i = 1; i < javaRecs.length; i++) { + Assert.assertTrue(javaRecs[i-1]._2() > javaRecs[i]._2()); + } + // Pretty safe bet! + Assert.assertTrue(javaRecs[0]._2() > 0.7); + } + } From c9edb0413b0a7e7e219eec25676377262bc1001c Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 31 Jul 2014 18:10:06 +0100 Subject: [PATCH 2/3] Updates from code review --- .../MatrixFactorizationModel.scala | 24 +++++----- .../mllib/recommendation/JavaALSSuite.java | 44 +++++++++++-------- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 3ac2c3addf737..3e6c892d75806 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -66,39 +66,41 @@ class MatrixFactorizationModel private[mllib] ( } /** - * Recommends products to users. + * Recommends products to a user. * * @param user the user to recommend products to - * @param howMany how many products to return. The number returned may be less than this. + * @param num how many products to return. The number returned may be less than this. * @return product ID and score tuples, sorted descending by score. The first product returned * is the one predicted to be most strongly recommended to the user. The score is an opaque * value that indicates how strongly recommended the product is. */ - def recommendProducts(user: Int, howMany: Int = 10): Array[(Int,Double)] = - recommend(userFeatures.lookup(user).head, productFeatures, howMany) + def recommendProducts(user: Int, num: Int): Array[Rating] = + recommend(userFeatures.lookup(user).head, productFeatures, num) + .map(t => Rating(user, t._1, t._2)) /** - * Recommends users to products. That is, this returns users who are most likely to be + * Recommends users to a product. That is, this returns users who are most likely to be * interested in a product. * * @param product the product to recommend users to - * @param howMany how many users to return. The number returned may be less than this. + * @param num how many users to return. The number returned may be less than this. * @return user ID and score tuples, sorted descending by score. The first user returned * is the one predicted to be most strongly interested in the product. The score is an opaque * value that indicates how strongly interested the user is. */ - def recommendUsers(product: Int, howMany: Int = 10): Array[(Int,Double)] = - recommend(productFeatures.lookup(product).head, userFeatures, howMany) + def recommendUsers(product: Int, num: Int): Array[Rating] = + recommend(productFeatures.lookup(product).head, userFeatures, num) + .map(t => Rating(t._1, product, t._2)) private def recommend( recommendToFeatures: Array[Double], - recommendableFeatures: RDD[(Int,Array[Double])], - howMany: Int): Array[(Int,Double)] = { + recommendableFeatures: RDD[(Int, Array[Double])], + num: Int): Array[(Int, Double)] = { val recommendToVector = new DoubleMatrix(recommendToFeatures) val scored = recommendableFeatures.map { case (id,features) => (id, recommendToVector.dot(new DoubleMatrix(features))) } - scored.top(howMany)(Ordering.by(_._2)) + scored.top(num)(Ordering.by(_._2)) } /** diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index 9ebeda0574531..71b1e4be7dff2 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -28,9 +28,9 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.jblas.DoubleMatrix; import scala.Tuple2; import scala.Tuple3; +import org.jblas.DoubleMatrix; public class JavaALSSuite implements Serializable { private transient JavaSparkContext sc; @@ -46,14 +46,15 @@ public void tearDown() { sc = null; } - static void validatePrediction(MatrixFactorizationModel model, - int users, - int products, - int features, - DoubleMatrix trueRatings, - double matchThreshold, - boolean implicitPrefs, - DoubleMatrix truePrefs) { + static void validatePrediction( + MatrixFactorizationModel model, + int users, + int products, + int features, + DoubleMatrix trueRatings, + double matchThreshold, + boolean implicitPrefs, + DoubleMatrix truePrefs) { DoubleMatrix predictedU = new DoubleMatrix(users, features); List> userFeatures = model.userFeatures().toJavaRDD().collect(); for (int i = 0; i < features; ++i) { @@ -176,7 +177,11 @@ public void runImplicitALSWithNegativeWeight() { users, products, features, 0.7, true, true); JavaRDD data = sc.parallelize(testData._1()); - MatrixFactorizationModel model = ALS.trainImplicit(data.rdd(), features, iterations); + MatrixFactorizationModel model = new ALS().setRank(features) + .setIterations(iterations) + .setImplicitPrefs(true) + .setSeed(8675309L) + .run(data.rdd()); validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3()); } @@ -189,20 +194,21 @@ public void runRecommend() { Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( users, products, features, 0.7, true, false); JavaRDD data = sc.parallelize(testData._1()); - MatrixFactorizationModel model = ALS.trainImplicit(data.rdd(), features, iterations); + MatrixFactorizationModel model = new ALS().setRank(features) + .setIterations(iterations) + .setImplicitPrefs(true) + .setSeed(8675309L) + .run(data.rdd()); validateRecommendations(model.recommendProducts(1, 10), 10); validateRecommendations(model.recommendUsers(1, 20), 20); } - private static void validateRecommendations(Tuple2[] recommendations, int howMany) { - @SuppressWarnings("unchecked") - Tuple2[] javaRecs = (Tuple2[]) (Object[]) recommendations; - Assert.assertEquals(howMany, javaRecs.length); - for (int i = 1; i < javaRecs.length; i++) { - Assert.assertTrue(javaRecs[i-1]._2() > javaRecs[i]._2()); + private static void validateRecommendations(Rating[] recommendations, int howMany) { + Assert.assertEquals(howMany, recommendations.length); + for (int i = 1; i < recommendations.length; i++) { + Assert.assertTrue(recommendations[i-1].rating() >= recommendations[i].rating()); } - // Pretty safe bet! - Assert.assertTrue(javaRecs[0]._2() > 0.7); + Assert.assertTrue(recommendations[0].rating() > 0.7); } } From b3496756ce44626fe1e9a5fd5732aeea04bf4267 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 1 Aug 2014 12:14:02 +0100 Subject: [PATCH 3/3] Additional review changes --- .../MatrixFactorizationModel.scala | 16 ++++++---- .../mllib/recommendation/JavaALSSuite.java | 29 ++++++++++--------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 3e6c892d75806..a1a76fcbe9f9c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -70,9 +70,11 @@ class MatrixFactorizationModel private[mllib] ( * * @param user the user to recommend products to * @param num how many products to return. The number returned may be less than this. - * @return product ID and score tuples, sorted descending by score. The first product returned - * is the one predicted to be most strongly recommended to the user. The score is an opaque - * value that indicates how strongly recommended the product is. + * @return [[Rating]] objects, each of which contains the given user ID, a product ID, and a + * "score" in the rating field. Each represents one recommended product, and they are sorted + * by score, decreasing. The first returned is the one predicted to be most strongly + * recommended to the user. The score is an opaque value that indicates how strongly + * recommended the product is. */ def recommendProducts(user: Int, num: Int): Array[Rating] = recommend(userFeatures.lookup(user).head, productFeatures, num) @@ -84,9 +86,11 @@ class MatrixFactorizationModel private[mllib] ( * * @param product the product to recommend users to * @param num how many users to return. The number returned may be less than this. - * @return user ID and score tuples, sorted descending by score. The first user returned - * is the one predicted to be most strongly interested in the product. The score is an opaque - * value that indicates how strongly interested the user is. + * @return [[Rating]] objects, each of which contains a user ID, the given product ID, and a + * "score" in the rating field. Each represents one recommended user, and they are sorted + * by score, decreasing. The first returned is the one predicted to be most strongly + * recommended to the product. The score is an opaque value that indicates how strongly + * recommended the user is. */ def recommendUsers(product: Int, num: Int): Array[Rating] = recommend(productFeatures.lookup(product).head, userFeatures, num) diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index 71b1e4be7dff2..f6ca9643227f8 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -20,6 +20,11 @@ import java.io.Serializable; import java.util.List; +import scala.Tuple2; +import scala.Tuple3; + +import org.jblas.DoubleMatrix; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -28,10 +33,6 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import scala.Tuple2; -import scala.Tuple3; -import org.jblas.DoubleMatrix; - public class JavaALSSuite implements Serializable { private transient JavaSparkContext sc; @@ -130,8 +131,8 @@ public void runALSUsingConstructor() { JavaRDD data = sc.parallelize(testData._1()); MatrixFactorizationModel model = new ALS().setRank(features) - .setIterations(iterations) - .run(data.rdd()); + .setIterations(iterations) + .run(data.rdd()); validatePrediction(model, users, products, features, testData._2(), 0.3, false, testData._3()); } @@ -178,10 +179,10 @@ public void runImplicitALSWithNegativeWeight() { JavaRDD data = sc.parallelize(testData._1()); MatrixFactorizationModel model = new ALS().setRank(features) - .setIterations(iterations) - .setImplicitPrefs(true) - .setSeed(8675309L) - .run(data.rdd()); + .setIterations(iterations) + .setImplicitPrefs(true) + .setSeed(8675309L) + .run(data.rdd()); validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3()); } @@ -195,10 +196,10 @@ public void runRecommend() { users, products, features, 0.7, true, false); JavaRDD data = sc.parallelize(testData._1()); MatrixFactorizationModel model = new ALS().setRank(features) - .setIterations(iterations) - .setImplicitPrefs(true) - .setSeed(8675309L) - .run(data.rdd()); + .setIterations(iterations) + .setImplicitPrefs(true) + .setSeed(8675309L) + .run(data.rdd()); validateRecommendations(model.recommendProducts(1, 10), 10); validateRecommendations(model.recommendUsers(1, 20), 20); }