Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3066][MLLIB] Support recommendAll in matrix factorization model #5829

Closed
wants to merge 18 commits into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -42,13 +42,11 @@ class MLPairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Se
self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))(
seqOp = (queue, item) => {
queue += item
queue
},
combOp = (queue1, queue2) => {
queue1 ++= queue2
queue1
}
).mapValues(_.toArray.sorted(ord.reverse))
).mapValues(_.toArray.reverse) // This is an min-heap, so we reverse the order.
}
}

@@ -20,14 +20,18 @@ package org.apache.spark.mllib.recommendation
import java.io.IOException
import java.lang.{Integer => JavaInteger}

import scala.collection.mutable

import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.hadoop.fs.Path
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import com.github.fommil.netlib.BLAS.{getInstance => blas}

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
@@ -57,7 +61,7 @@ class MatrixFactorizationModel(

/** Validates factors and warns users if there are performance concerns. */
private def validateFeatures(name: String, features: RDD[(Int, Array[Double])]): Unit = {
require(features.first()._2.size == rank,
require(features.first()._2.length == rank,
s"$name feature dimension does not match the rank $rank.")
if (features.partitioner.isEmpty) {
logWarning(s"$name factor does not have a partitioner. "
@@ -72,19 +76,19 @@ class MatrixFactorizationModel(
def predict(user: Int, product: Int): Double = {
val userVector = userFeatures.lookup(user).head
val productVector = productFeatures.lookup(product).head
blas.ddot(userVector.length, userVector, 1, productVector, 1)
blas.ddot(rank, userVector, 1, productVector, 1)
}

/**
* Predict the rating of many users for many products.
* The output RDD has an element per each element in the input RDD (including all duplicates)
* unless a user or product is missing in the training set.
*
* @param usersProducts RDD of (user, product) pairs.
* @return RDD of Ratings.
*/
* Predict the rating of many users for many products.
* The output RDD has an element per each element in the input RDD (including all duplicates)
* unless a user or product is missing in the training set.
*
* @param usersProducts RDD of (user, product) pairs.
* @return RDD of Ratings.
*/
def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = {
val users = userFeatures.join(usersProducts).map{
val users = userFeatures.join(usersProducts).map {
case (user, (uFeatures, product)) => (product, (user, uFeatures))
}
users.join(productFeatures).map {
@@ -112,7 +116,7 @@ class MatrixFactorizationModel(
* recommended the product is.
*/
def recommendProducts(user: Int, num: Int): Array[Rating] =
recommend(userFeatures.lookup(user).head, productFeatures, num)
MatrixFactorizationModel.recommend(userFeatures.lookup(user).head, productFeatures, num)
.map(t => Rating(user, t._1, t._2))

/**
@@ -128,7 +132,7 @@ class MatrixFactorizationModel(
* recommended the user is.
*/
def recommendUsers(product: Int, num: Int): Array[Rating] =
recommend(productFeatures.lookup(product).head, userFeatures, num)
MatrixFactorizationModel.recommend(productFeatures.lookup(product).head, userFeatures, num)
.map(t => Rating(t._1, product, t._2))

protected override val formatVersion: String = "1.0"
@@ -137,20 +141,113 @@ class MatrixFactorizationModel(
MatrixFactorizationModel.SaveLoadV1_0.save(this, path)
}

/**
* Recommends topK products for all users.
*
* @param num how many products to return for every user.
* @return [(Int, Array[Rating])] objects, where every tuple contains a userID and an array of
* rating objects which contains the same userId, recommended productID and a "score" in the
* rating field. Semantics of score is same as recommendProducts API
*/
def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = {
MatrixFactorizationModel.recommendForAll(rank, userFeatures, productFeatures, num).map {
case (user, top) =>
val ratings = top.map { case (product, rating) => Rating(user, product, rating) }
(user, ratings)
}
}


/**
* Recommends topK users for all products.
*
* @param num how many users to return for every product.
* @return [(Int, Array[Rating])] objects, where every tuple contains a productID and an array
* of rating objects which contains the recommended userId, same productID and a "score" in the
* rating field. Semantics of score is same as recommendUsers API
*/
def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = {
MatrixFactorizationModel.recommendForAll(rank, productFeatures, userFeatures, num).map {
case (product, top) =>
val ratings = top.map { case (user, rating) => Rating(user, product, rating) }
(product, ratings)
}
}
}

object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {

import org.apache.spark.mllib.util.Loader._

/**
* Makes recommendations for a single user (or product).
*/
private def recommend(
recommendToFeatures: Array[Double],
recommendableFeatures: RDD[(Int, Array[Double])],
num: Int): Array[(Int, Double)] = {
val scored = recommendableFeatures.map { case (id,features) =>
val scored = recommendableFeatures.map { case (id, features) =>
(id, blas.ddot(features.length, recommendToFeatures, 1, features, 1))
}
scored.top(num)(Ordering.by(_._2))
}
}

object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
/**
* Makes recommendations for all users (or products).
* @param rank rank
* @param srcFeatures src features to receive recommendations
* @param dstFeatures dst features used to make recommendations
* @param num number of recommendations for each record
* @return an RDD of (srcId: Int, recommendations), where recommendations are stored as an array
* of (dstId, rating) pairs.
*/
private def recommendForAll(
rank: Int,
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {

This comment has been minimized.

Copy link
@debasish83

debasish83 May 1, 2015

Normally items are skinny ~ 1M...and ranks are low...50...so 1Mx50 bytes ~ 50 MB...with 8M products, its 400 MB...I still think that cartesian will be slower than the version I added in terms of runtime....did you run any benchmark with the old code ?

This comment has been minimized.

Copy link
@mengxr

mengxr May 1, 2015

Author Contributor

That depends on the data. It is also common to have near-squared rating matrix. This should provide similar performance if the items/products are not super small, but I didn't test the performance. The advantage is that this approach doesn't touch the driver, so it could be more scalable.

This comment has been minimized.

Copy link
@debasish83

debasish83 May 1, 2015

I also like it better as it should scale fine assuming cartesian keys are under control...say to 100M x 10M with 400 factors....

case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
}
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}

import org.apache.spark.mllib.util.Loader._
/**
* Blockifies features to use Level-3 BLAS.
*/
private def blockify(
rank: Int,
features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
val blockSize = 4096 // TODO: tune the block size
val blockStorage = rank * blockSize
features.mapPartitions { iter =>
iter.grouped(blockSize).map { grouped =>
val ids = mutable.ArrayBuilder.make[Int]
ids.sizeHint(blockSize)
val factors = mutable.ArrayBuilder.make[Double]
factors.sizeHint(blockStorage)
var i = 0
grouped.foreach { case (id, factor) =>
ids += id
factors ++= factor
i += 1
}
(ids.result(), new DenseMatrix(rank, i, factors.result()))
}
}
}

override def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
val (loadedClassName, formatVersion, _) = loadMetadata(sc, path)
@@ -214,4 +311,5 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
new Path(dataPath(path), "product").toUri.toString
}
}

}
@@ -24,13 +24,13 @@ import org.apache.spark.mllib.rdd.MLPairRDDFunctions._

class MLPairRDDFunctionsSuite extends FunSuite with MLlibTestSparkContext {
test("topByKey") {
val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (3, 5), (5, 1), (5, 3)), 2)
val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (5, 1), (3, 5)), 2)
.topByKey(2)
.collectAsMap()

assert(topMap.size === 3)
assert(topMap(1) === Array(2, 1))
assert(topMap(3) === Array(7, 5))
assert(topMap(5) === Array(3, 1))
assert(topMap(5) === Array(1))
}
}
@@ -72,4 +72,24 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext
Utils.deleteRecursively(tempDir)
}
}

test("batch predict API recommendProductsForUsers") {
val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures)
val topK = 10
val recommendations = model.recommendProductsForUsers(topK).collectAsMap()

assert(recommendations(0)(0).rating ~== 17.0 relTol 1e-14)
assert(recommendations(1)(0).rating ~== 39.0 relTol 1e-14)
}

test("batch predict API recommendUsersForProducts") {
val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures)
val topK = 10
val recommendations = model.recommendUsersForProducts(topK).collectAsMap()

assert(recommendations(2)(0).user == 1)
assert(recommendations(2)(0).rating ~== 39.0 relTol 1e-14)
assert(recommendations(2)(1).user == 0)
assert(recommendations(2)(1).rating ~== 17.0 relTol 1e-14)
}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.