From 860bc2ce5f290a042756d2569eb215eee6a1fdad Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 16 Mar 2017 09:07:06 +0200 Subject: [PATCH 01/12] wip --- .../apache/spark/ml/recommendation/ALS.scala | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 3d5fd1794de23..4a3704a9a42e5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -344,6 +344,18 @@ class ALSModel private[ml] ( recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems) } + /** + * Returns top `numItems` items recommended for each user in the input data set. + * @param dataset a Dataset containing a column of user ids. The column name must match `userCol`. + * @param numItems max number of recommendations for each user. + * @return a DataFrame of (userCol: Int, recommendations), where recommendations are + * stored as an array of (itemCol: Int, rating: Float) Rows. + */ + def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = { + val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol)) + recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems) + } + /** * Returns top `numUsers` users recommended for each item, for all items. * @param numUsers max number of recommendations for each item @@ -355,6 +367,36 @@ class ALSModel private[ml] ( recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers) } + /** + * Returns top `numUsers` users recommended for each item in the input data set. + * @param dataset a Dataset containing a column of item ids. The column name must match `itemCol`. + * @param numUsers max number of recommendations for each item. + * @return a DataFrame of (itemCol: Int, recommendations), where recommendations are + * stored as an array of (userCol: Int, rating: Float) Rows. + */ + def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = { + val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol)) + recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers) + } + + /** + * Returns a subset of a factor DataFrame limited to only those ids contained + * in the input dataset. + * @param dataset input Dataset containing id column to user to filter factors. + * @param factors factor DataFrame to filter. + * @param column column name containing the ids in the input dataset. + * @return DataFrame containing factors only for those ids present in both the input dataset and + * the factor DataFrame. + */ + private def getSourceFactorSubset( + dataset: Dataset[_], + factors: DataFrame, + column: String): DataFrame = { + dataset.select(column) + .join(factors, dataset(column) === factors("id")) + .select(factors("id"), factors("features")) + } + /** * Makes recommendations for all users (or items). * From 8cd9edd5e2440da15f677828cda5207e6a40be31 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 4 May 2017 09:40:22 +0200 Subject: [PATCH 02/12] further wip --- .../org/apache/spark/ml/recommendation/ALS.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 4a3704a9a42e5..0ae318a09def0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -345,12 +345,13 @@ class ALSModel private[ml] ( } /** - * Returns top `numItems` items recommended for each user in the input data set. + * Returns top `numItems` items recommended for each unique user id in the input data set. * @param dataset a Dataset containing a column of user ids. The column name must match `userCol`. * @param numItems max number of recommendations for each user. * @return a DataFrame of (userCol: Int, recommendations), where recommendations are * stored as an array of (itemCol: Int, rating: Float) Rows. */ + @Since("2.3.0") def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = { val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol)) recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems) @@ -368,12 +369,13 @@ class ALSModel private[ml] ( } /** - * Returns top `numUsers` users recommended for each item in the input data set. + * Returns top `numUsers` users recommended for each unique item id in the input data set. * @param dataset a Dataset containing a column of item ids. The column name must match `itemCol`. * @param numUsers max number of recommendations for each item. * @return a DataFrame of (itemCol: Int, recommendations), where recommendations are * stored as an array of (userCol: Int, rating: Float) Rows. */ + @Since("2.3.0") def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = { val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol)) recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers) @@ -389,9 +391,9 @@ class ALSModel private[ml] ( * the factor DataFrame. */ private def getSourceFactorSubset( - dataset: Dataset[_], - factors: DataFrame, - column: String): DataFrame = { + dataset: Dataset[_], + factors: DataFrame, + column: String): DataFrame = { dataset.select(column) .join(factors, dataset(column) === factors("id")) .select(factors("id"), factors("features")) From 76fb332aa5e8483590ebb4305901f5c3e5c73c15 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 9 May 2017 11:19:38 +0200 Subject: [PATCH 03/12] Update doc --- .../scala/org/apache/spark/ml/recommendation/ALS.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 0ae318a09def0..41e21c488ff8a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -345,7 +345,9 @@ class ALSModel private[ml] ( } /** - * Returns top `numItems` items recommended for each unique user id in the input data set. + * Returns top `numItems` items recommended for each user id in the input data set. Note that if + * there are duplicate ids in the input dataset, duplicate recommendations will be returned. + * The caller is responsible for de-duplicating input ids. * @param dataset a Dataset containing a column of user ids. The column name must match `userCol`. * @param numItems max number of recommendations for each user. * @return a DataFrame of (userCol: Int, recommendations), where recommendations are @@ -369,7 +371,9 @@ class ALSModel private[ml] ( } /** - * Returns top `numUsers` users recommended for each unique item id in the input data set. + * Returns top `numUsers` users recommended for each unique item id in the input data set. Note + * that if there are duplicate ids in the input dataset, duplicate recommendations will be + * returned. The caller is responsible for de-duplicating input ids. * @param dataset a Dataset containing a column of item ids. The column name must match `itemCol`. * @param numUsers max number of recommendations for each item. * @return a DataFrame of (itemCol: Int, recommendations), where recommendations are From 6539d294c5dac499d106f7346f496dac8fee24e8 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 9 May 2017 11:20:55 +0200 Subject: [PATCH 04/12] Update doc --- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 41e21c488ff8a..9467299d05c32 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -371,9 +371,9 @@ class ALSModel private[ml] ( } /** - * Returns top `numUsers` users recommended for each unique item id in the input data set. Note - * that if there are duplicate ids in the input dataset, duplicate recommendations will be - * returned. The caller is responsible for de-duplicating input ids. + * Returns top `numUsers` users recommended for each item id in the input data set. Note that if + * there are duplicate ids in the input dataset, duplicate recommendations will be returned. + * The caller is responsible for de-duplicating input ids. * @param dataset a Dataset containing a column of item ids. The column name must match `itemCol`. * @param numUsers max number of recommendations for each item. * @return a DataFrame of (itemCol: Int, recommendations), where recommendations are From c723dff8a9f125ce4d69574f47c74aaf0df7a9da Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 9 May 2017 11:23:20 +0200 Subject: [PATCH 05/12] Update doc --- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 9467299d05c32..2b2171b2070bb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -387,7 +387,8 @@ class ALSModel private[ml] ( /** * Returns a subset of a factor DataFrame limited to only those ids contained - * in the input dataset. + * in the input dataset. The caller is responsible for removing any duplicate ids + * from the dataset. * @param dataset input Dataset containing id column to user to filter factors. * @param factors factor DataFrame to filter. * @param column column name containing the ids in the input dataset. From 0004d1c9ea5074965d234fa7833450de3ffa871b Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 10 May 2017 11:15:35 +0200 Subject: [PATCH 06/12] wip on tests --- .../spark/ml/recommendation/ALSSuite.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 45d3f9b4c53be..f1b38012a1ec4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -738,6 +738,28 @@ class ALSSuite } } + test("recommendForUserSubset") { + val spark = this.spark + import spark.implicits._ + val model = getALSModel + val numItems = model.itemFactors.count + val expected = Map( + 0 -> Array((3, 54f), (4, 44f), (5, 42f), (6, 28f)), + 2 -> Array((3, 51f), (5, 45f), (4, 30f), (6, 18f)) + ) + val userSubset = expected.keys.toSeq.toDF("user") + val numUsersSubset = userSubset.count + + Seq(2, 4, 6).foreach { k => + val n = math.min(k, numItems).toInt + val expectedUpToN = expected.mapValues(_.slice(0, n)) + val topItems = model.recommendForUserSubset(userSubset, k) + assert(topItems.count() == numUsersSubset) + assert(topItems.columns.contains("user")) + checkRecommendations(topItems, expectedUpToN, "item") + } + } + test("recommendForAllItems with k <, = and > num_users") { val model = getALSModel val numUsers = model.userFactors.count From 53229a1abc860aa8fb3c0d933fdbcef4d47f0508 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Fri, 12 May 2017 12:42:23 +0200 Subject: [PATCH 07/12] Clean up docs and further tests --- .../apache/spark/ml/recommendation/ALS.scala | 8 +- .../spark/ml/recommendation/ALSSuite.scala | 85 +++++++++++++++---- 2 files changed, 73 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 2b2171b2070bb..8dc81e3da1c07 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -346,8 +346,8 @@ class ALSModel private[ml] ( /** * Returns top `numItems` items recommended for each user id in the input data set. Note that if - * there are duplicate ids in the input dataset, duplicate recommendations will be returned. - * The caller is responsible for de-duplicating input ids. + * there are duplicate ids in the input dataset, only one set of recommendations per unique id + * will be returned. * @param dataset a Dataset containing a column of user ids. The column name must match `userCol`. * @param numItems max number of recommendations for each user. * @return a DataFrame of (userCol: Int, recommendations), where recommendations are @@ -372,8 +372,8 @@ class ALSModel private[ml] ( /** * Returns top `numUsers` users recommended for each item id in the input data set. Note that if - * there are duplicate ids in the input dataset, duplicate recommendations will be returned. - * The caller is responsible for de-duplicating input ids. + * there are duplicate ids in the input dataset, only one set of recommendations per unique id + * will be returned. * @param dataset a Dataset containing a column of item ids. The column name must match `itemCol`. * @param numUsers max number of recommendations for each item. * @return a DataFrame of (itemCol: Int, recommendations), where recommendations are diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index f1b38012a1ec4..ab88fb23d6245 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -723,9 +723,9 @@ class ALSSuite val numUsers = model.userFactors.count val numItems = model.itemFactors.count val expected = Map( - 0 -> Array((3, 54f), (4, 44f), (5, 42f), (6, 28f)), - 1 -> Array((3, 39f), (5, 33f), (4, 26f), (6, 16f)), - 2 -> Array((3, 51f), (5, 45f), (4, 30f), (6, 18f)) + 0 -> Seq((3, 54f), (4, 44f), (5, 42f), (6, 28f)), + 1 -> Seq((3, 39f), (5, 33f), (4, 26f), (6, 16f)), + 2 -> Seq((3, 51f), (5, 45f), (4, 30f), (6, 18f)) ) Seq(2, 4, 6).foreach { k => @@ -738,14 +738,35 @@ class ALSSuite } } - test("recommendForUserSubset") { + test("recommendForAllItems with k <, = and > num_users") { + val model = getALSModel + val numUsers = model.userFactors.count + val numItems = model.itemFactors.count + val expected = Map( + 3 -> Seq((0, 54f), (2, 51f), (1, 39f)), + 4 -> Seq((0, 44f), (2, 30f), (1, 26f)), + 5 -> Seq((2, 45f), (0, 42f), (1, 33f)), + 6 -> Seq((0, 28f), (2, 18f), (1, 16f)) + ) + + Seq(2, 3, 4).foreach { k => + val n = math.min(k, numUsers).toInt + val expectedUpToN = expected.mapValues(_.slice(0, n)) + val topUsers = getALSModel.recommendForAllItems(k) + assert(topUsers.count() == numItems) + assert(topUsers.columns.contains("item")) + checkRecommendations(topUsers, expectedUpToN, "user") + } + } + + test("recommendForUserSubset with k <, = and > num_items") { val spark = this.spark import spark.implicits._ val model = getALSModel val numItems = model.itemFactors.count val expected = Map( - 0 -> Array((3, 54f), (4, 44f), (5, 42f), (6, 28f)), - 2 -> Array((3, 51f), (5, 45f), (4, 30f), (6, 18f)) + 0 -> Seq((3, 54f), (4, 44f), (5, 42f), (6, 28f)), + 2 -> Seq((3, 51f), (5, 45f), (4, 30f), (6, 18f)) ) val userSubset = expected.keys.toSeq.toDF("user") val numUsersSubset = userSubset.count @@ -760,30 +781,62 @@ class ALSSuite } } - test("recommendForAllItems with k <, = and > num_users") { + test("recommendForItemSubset with k <, = and > num_users") { + val spark = this.spark + import spark.implicits._ val model = getALSModel val numUsers = model.userFactors.count - val numItems = model.itemFactors.count val expected = Map( - 3 -> Array((0, 54f), (2, 51f), (1, 39f)), - 4 -> Array((0, 44f), (2, 30f), (1, 26f)), - 5 -> Array((2, 45f), (0, 42f), (1, 33f)), - 6 -> Array((0, 28f), (2, 18f), (1, 16f)) + 3 -> Seq((0, 54f), (2, 51f), (1, 39f)), + 6 -> Seq((0, 28f), (2, 18f), (1, 16f)) ) + val itemSubset = expected.keys.toSeq.toDF("item") + val numItemsSubset = itemSubset.count - Seq(2, 3, 4).foreach { k => + Seq(2, 4, 6).foreach { k => val n = math.min(k, numUsers).toInt val expectedUpToN = expected.mapValues(_.slice(0, n)) - val topUsers = getALSModel.recommendForAllItems(k) - assert(topUsers.count() == numItems) + val topUsers = model.recommendForItemSubset(itemSubset, k) + assert(topUsers.count() == numItemsSubset) assert(topUsers.columns.contains("item")) checkRecommendations(topUsers, expectedUpToN, "user") } } + test("subset recommendations eliminate duplicate ids") { + val spark = this.spark + import spark.implicits._ + val model = getALSModel + + val k = 2 + val dupUsers = Seq(0, 1, 0, 1).toDF("user") + val userRecs = model.recommendForUserSubset(dupUsers, k) + assert(userRecs.count == 2) + val dupItems = Seq(3, 4, 5, 4, 5).toDF("item") + val itemRecs = model.recommendForItemSubset(dupItems, k) + assert(itemRecs.count == 3) + } + + test("subset recommendations on full input dataset equivalent to recommendForAll methods") { + val spark = this.spark + import spark.implicits._ + val model = getALSModel + val k = 2 + + val userSubset = model.userFactors.withColumnRenamed("id", "user").drop("features") + val userSubsetRecs = model.recommendForUserSubset(userSubset, k) + val allUserRecs = model.recommendForAllUsers(k).as[(Int, Seq[(Int, Float)])].collect().toMap + checkRecommendations(userSubsetRecs, allUserRecs, "item") + + val itemSubset = model.itemFactors.withColumnRenamed("id", "item").drop("features") + val itemSubsetRecs = model.recommendForItemSubset(itemSubset, k) + val allItemRecs = model.recommendForAllItems(k).as[(Int, Seq[(Int, Float)])].collect().toMap + checkRecommendations(itemSubsetRecs, allItemRecs, "user") + } + private def checkRecommendations( topK: DataFrame, - expected: Map[Int, Array[(Int, Float)]], + expected: Map[Int, Seq[(Int, Float)]], dstColName: String): Unit = { val spark = this.spark import spark.implicits._ From 5a8c4216ce636dea3ba67baa9b169db7486f37f2 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 27 Jul 2017 10:28:11 +0200 Subject: [PATCH 08/12] Explicitly handle duplicate ids with distinct. Update tests --- .../apache/spark/ml/recommendation/ALS.scala | 6 ++--- .../spark/ml/recommendation/ALSSuite.scala | 23 +++++++++++++------ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 8dc81e3da1c07..a8e3bb1147013 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -386,9 +386,8 @@ class ALSModel private[ml] ( } /** - * Returns a subset of a factor DataFrame limited to only those ids contained - * in the input dataset. The caller is responsible for removing any duplicate ids - * from the dataset. + * Returns a subset of a factor DataFrame limited to only those unique ids contained + * in the input dataset. * @param dataset input Dataset containing id column to user to filter factors. * @param factors factor DataFrame to filter. * @param column column name containing the ids in the input dataset. @@ -400,6 +399,7 @@ class ALSModel private[ml] ( factors: DataFrame, column: String): DataFrame = { dataset.select(column) + .distinct() .join(factors, dataset(column) === factors("id")) .select(factors("id"), factors("features")) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index ab88fb23d6245..b107e56c1421c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -803,21 +803,30 @@ class ALSSuite } } - test("subset recommendations eliminate duplicate ids") { + test("subset recommendations eliminate duplicate ids, returns same results as unique ids") { val spark = this.spark import spark.implicits._ val model = getALSModel - val k = 2 + + val users = Seq(0, 1).toDF("user") val dupUsers = Seq(0, 1, 0, 1).toDF("user") - val userRecs = model.recommendForUserSubset(dupUsers, k) - assert(userRecs.count == 2) + val singleUserRecs = model.recommendForUserSubset(users, k) + val dupUserRecs = model.recommendForUserSubset(dupUsers, k) + .as[(Int, Seq[(Int, Float)])].collect().toMap + assert(singleUserRecs.count == dupUserRecs.size) + checkRecommendations(singleUserRecs, dupUserRecs, "item") + + val items = Seq(3, 4, 5).toDF("item") val dupItems = Seq(3, 4, 5, 4, 5).toDF("item") - val itemRecs = model.recommendForItemSubset(dupItems, k) - assert(itemRecs.count == 3) + val singleItemRecs = model.recommendForItemSubset(items, k) + val dupItemRecs = model.recommendForItemSubset(dupItems, k) + .as[(Int, Seq[(Int, Float)])].collect().toMap + assert(singleItemRecs.count == dupItemRecs.size) + checkRecommendations(singleItemRecs, dupItemRecs, "user") } - test("subset recommendations on full input dataset equivalent to recommendForAll methods") { + test("subset recommendations on full input dataset equivalent to recommendForAll") { val spark = this.spark import spark.implicits._ val model = getALSModel From 4bd91f12e1b15657d92fea6d7b91dae2e6e68c29 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 27 Jul 2017 11:06:11 +0200 Subject: [PATCH 09/12] small fix for k to match other item test --- .../scala/org/apache/spark/ml/recommendation/ALSSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index b107e56c1421c..27091d70c6ea9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -793,7 +793,7 @@ class ALSSuite val itemSubset = expected.keys.toSeq.toDF("item") val numItemsSubset = itemSubset.count - Seq(2, 4, 6).foreach { k => + Seq(2, 3, 4).foreach { k => val n = math.min(k, numUsers).toInt val expectedUpToN = expected.mapValues(_.slice(0, n)) val topUsers = model.recommendForItemSubset(itemSubset, k) From 8ed91ab283ccaa0b47ebe8467acc186aeca20c54 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Fri, 15 Sep 2017 16:39:27 +0200 Subject: [PATCH 10/12] Use left semi-join instead of distinc and join --- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index a8e3bb1147013..a8843661c873b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -398,9 +398,8 @@ class ALSModel private[ml] ( dataset: Dataset[_], factors: DataFrame, column: String): DataFrame = { - dataset.select(column) - .distinct() - .join(factors, dataset(column) === factors("id")) + factors + .join(dataset.select(column), factors("id") === dataset(column), joinType = "left_semi") .select(factors("id"), factors("features")) } From f6cd85419d6705c150b624ef46fe33d29e158d1b Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 3 Oct 2017 10:08:04 +0200 Subject: [PATCH 11/12] Add subset recs to ALS examples --- .../org/apache/spark/examples/ml/JavaALSExample.java | 9 +++++++++ examples/src/main/python/ml/als_example.py | 9 +++++++++ .../scala/org/apache/spark/examples/ml/ALSExample.scala | 9 +++++++++ 3 files changed, 27 insertions(+) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java index fe4d6bc83f04a..27052be87b82e 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java @@ -118,9 +118,18 @@ public static void main(String[] args) { Dataset userRecs = model.recommendForAllUsers(10); // Generate top 10 user recommendations for each movie Dataset movieRecs = model.recommendForAllItems(10); + + // Generate top 10 movie recommendations for a specified set of users + Dataset users = ratings.select(als.getUserCol()).distinct().limit(3); + Dataset userSubsetRecs = model.recommendForUserSubset(users, 10); + // Generate top 10 user recommendations for a specified set of movies + Dataset movies = ratings.select(als.getItemCol()).distinct().limit(3); + Dataset movieSubSetRecs = model.recommendForItemSubset(movies, 10); // $example off$ userRecs.show(); movieRecs.show(); + userSubsetRecs.show(); + movieSubSetRecs.show(); spark.stop(); } diff --git a/examples/src/main/python/ml/als_example.py b/examples/src/main/python/ml/als_example.py index 1672d552eb1d5..8b7ec9c439f9f 100644 --- a/examples/src/main/python/ml/als_example.py +++ b/examples/src/main/python/ml/als_example.py @@ -60,8 +60,17 @@ userRecs = model.recommendForAllUsers(10) # Generate top 10 user recommendations for each movie movieRecs = model.recommendForAllItems(10) + + # Generate top 10 movie recommendations for a specified set of users + users = ratings.select(als.getUserCol()).distinct().limit(3) + userSubsetRecs = model.recommendForUserSubset(users, 10) + # Generate top 10 user recommendations for a specified set of movies + movies = ratings.select(als.getItemCol()).distinct().limit(3) + movieSubSetRecs = model.recommendForItemSubset(movies, 10) # $example off$ userRecs.show() movieRecs.show() + userSubsetRecs.show() + movieSubSetRecs.show() spark.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala index 07b15dfa178f7..8091838a2301e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala @@ -80,9 +80,18 @@ object ALSExample { val userRecs = model.recommendForAllUsers(10) // Generate top 10 user recommendations for each movie val movieRecs = model.recommendForAllItems(10) + + // Generate top 10 movie recommendations for a specified set of users + val users = ratings.select(als.getUserCol).distinct().limit(3) + val userSubsetRecs = model.recommendForUserSubset(users, 10) + // Generate top 10 user recommendations for a specified set of movies + val movies = ratings.select(als.getItemCol).distinct().limit(3) + val movieSubSetRecs = model.recommendForItemSubset(movies, 10) // $example off$ userRecs.show() movieRecs.show() + userSubsetRecs.show() + movieSubSetRecs.show() spark.stop() } From 526675d009a0f800d62e0e0334e87fef15bdd86c Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 4 Oct 2017 20:58:47 +0200 Subject: [PATCH 12/12] Add PySpark API for subset recs --- python/pyspark/ml/recommendation.py | 38 +++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py index bcfb36880eb02..e8bcbe4cd34cb 100644 --- a/python/pyspark/ml/recommendation.py +++ b/python/pyspark/ml/recommendation.py @@ -90,6 +90,14 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha >>> item_recs.where(item_recs.item == 2)\ .select("recommendations.user", "recommendations.rating").collect() [Row(user=[2, 1, 0], rating=[4.901..., 3.981..., -0.138...])] + >>> user_subset = df.where(df.user == 2) + >>> user_subset_recs = model.recommendForUserSubset(user_subset, 3) + >>> user_subset_recs.select("recommendations.item", "recommendations.rating").first() + Row(item=[2, 1, 0], rating=[4.901..., 1.056..., -1.501...]) + >>> item_subset = df.where(df.item == 0) + >>> item_subset_recs = model.recommendForItemSubset(item_subset, 3) + >>> item_subset_recs.select("recommendations.user", "recommendations.rating").first() + Row(user=[0, 1, 2], rating=[3.910..., 2.625..., -1.501...]) >>> als_path = temp_path + "/als" >>> als.save(als_path) >>> als2 = ALS.load(als_path) @@ -414,6 +422,36 @@ def recommendForAllItems(self, numUsers): """ return self._call_java("recommendForAllItems", numUsers) + @since("2.3.0") + def recommendForUserSubset(self, dataset, numItems): + """ + Returns top `numItems` items recommended for each user id in the input data set. Note that + if there are duplicate ids in the input dataset, only one set of recommendations per unique + id will be returned. + + :param dataset: a Dataset containing a column of user ids. The column name must match + `userCol`. + :param numItems: max number of recommendations for each user + :return: a DataFrame of (userCol, recommendations), where recommendations are + stored as an array of (itemCol, rating) Rows. + """ + return self._call_java("recommendForUserSubset", dataset, numItems) + + @since("2.3.0") + def recommendForItemSubset(self, dataset, numUsers): + """ + Returns top `numUsers` users recommended for each item id in the input data set. Note that + if there are duplicate ids in the input dataset, only one set of recommendations per unique + id will be returned. + + :param dataset: a Dataset containing a column of item ids. The column name must match + `itemCol`. + :param numUsers: max number of recommendations for each item + :return: a DataFrame of (itemCol, recommendations), where recommendations are + stored as an array of (userCol, rating) Rows. + """ + return self._call_java("recommendForItemSubset", dataset, numUsers) + if __name__ == "__main__": import doctest