From c774d7d4bff91c9387d059d1189799fa0ff1f4b0 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 14 Apr 2014 18:01:18 -0400 Subject: [PATCH 01/23] Make the partitioner a member variable and use it instead of modding directly. --- .../spark/mllib/recommendation/ALS.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 5cc47de8ffdfc..63975445640e9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -96,6 +96,7 @@ class ALS private ( private var lambda: Double, private var implicitPrefs: Boolean, private var alpha: Double, + private var partitioner: Partitioner = null, private var seed: Long = System.nanoTime() ) extends Serializable with Logging { @@ -103,7 +104,7 @@ class ALS private ( * Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10, * lambda: 0.01, implicitPrefs: false, alpha: 1.0}. */ - def this() = this(-1, 10, 10, 0.01, false, 1.0) + def this() = this(-1, 10, 10, 0.01, false, 1.0, null) /** * Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured @@ -167,11 +168,11 @@ class ALS private ( this.numBlocks } - val partitioner = new HashPartitioner(numBlocks) + this.partitioner = new HashPartitioner(numBlocks) - val ratingsByUserBlock = ratings.map{ rating => (rating.user % numBlocks, rating) } + val ratingsByUserBlock = ratings.map{ rating => (partitioner.getPartition(rating.user), rating) } val ratingsByProductBlock = ratings.map{ rating => - (rating.product % numBlocks, Rating(rating.product, rating.user, rating.rating)) + (partitioner.getPartition(rating.product), Rating(rating.product, rating.user, rating.rating)) } val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock) @@ -322,7 +323,7 @@ class ALS private ( val userIdToPos = userIds.zipWithIndex.toMap val shouldSend = Array.fill(numUsers)(new BitSet(numBlocks)) for (r <- ratings) { - shouldSend(userIdToPos(r.user))(r.product % numBlocks) = true + shouldSend(userIdToPos(r.user))(partitioner.getPartition(r.product)) = true } OutLinkBlock(userIds, shouldSend) } @@ -338,7 +339,7 @@ class ALS private ( // Split out our ratings by product block val blockRatings = Array.fill(numBlocks)(new ArrayBuffer[Rating]) for (r <- ratings) { - blockRatings(r.product % numBlocks) += r + blockRatings(partitioner.getPartition(r.product)) += r } val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numBlocks) for (productBlock <- 0 until numBlocks) { @@ -543,7 +544,7 @@ object ALS { blocks: Int, seed: Long ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings) + new ALS(blocks, rank, iterations, lambda, false, 1.0, null, seed).run(ratings) } /** @@ -566,7 +567,7 @@ object ALS { lambda: Double, blocks: Int ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings) + new ALS(blocks, rank, iterations, lambda, false, 1.0, null).run(ratings) } /** @@ -626,7 +627,7 @@ object ALS { alpha: Double, seed: Long ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, true, alpha, seed).run(ratings) + new ALS(blocks, rank, iterations, lambda, true, alpha, null, seed).run(ratings) } /** @@ -651,7 +652,7 @@ object ALS { blocks: Int, alpha: Double ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings) + new ALS(blocks, rank, iterations, lambda, true, alpha, null).run(ratings) } /** From c90b6d8e91f86cf89adf28de6f9185647c87e5c8 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 14 Apr 2014 18:10:30 -0400 Subject: [PATCH 02/23] Scramble user and product ids before bucketing. --- .../spark/mllib/recommendation/ALS.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 63975445640e9..b8fc79477e639 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -32,6 +32,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.SparkContext._ +import org.apache.spark.util.Utils /** * Out-link information for a user or product block. This includes the original user/product IDs @@ -168,7 +169,20 @@ class ALS private ( this.numBlocks } - this.partitioner = new HashPartitioner(numBlocks) + // Hash an integer to propagate random bits at all positions, similar to java.util.HashTable + def hash(x: Int): Int = { + val r = x ^ (x >>> 20) ^ (x >>> 12) + r ^ (r >>> 7) ^ (r >>> 4) + } + + this.partitioner = new Partitioner { + def numPartitions = numBlocks + + def getPartition(x: Any): Int = x match { + case null => 0 + case _ => Utils.nonNegativeMod(hash(x.hashCode), numPartitions) + } + } val ratingsByUserBlock = ratings.map{ rating => (partitioner.getPartition(rating.user), rating) } val ratingsByProductBlock = ratings.map{ rating => @@ -183,11 +197,6 @@ class ALS private ( val seedGen = new Random(seed) val seed1 = seedGen.nextInt() val seed2 = seedGen.nextInt() - // Hash an integer to propagate random bits at all positions, similar to java.util.HashTable - def hash(x: Int): Int = { - val r = x ^ (x >>> 20) ^ (x >>> 12) - r ^ (r >>> 7) ^ (r >>> 4) - } var users = userOutLinks.mapPartitionsWithIndex { (index, itr) => val rand = new Random(hash(seed1 ^ index)) itr.map { case (x, y) => From df27697649de50d364c42c76aebaebb34cbe87e2 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Tue, 15 Apr 2014 15:47:17 -0400 Subject: [PATCH 03/23] Support custom partitioners. Currently we use the same partitioner for users and products. --- .../spark/mllib/recommendation/ALS.scala | 104 ++++++++++++++---- .../spark/mllib/recommendation/ALSSuite.scala | 24 +++- 2 files changed, 105 insertions(+), 23 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index b8fc79477e639..069a11b976b2b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, BitSet} import scala.math.{abs, sqrt} import scala.util.Random import scala.util.Sorting +import scala.util.hashing.byteswap32 import com.esotericsoftware.kryo.Kryo import org.jblas.{DoubleMatrix, SimpleBlas, Solve} @@ -97,7 +98,6 @@ class ALS private ( private var lambda: Double, private var implicitPrefs: Boolean, private var alpha: Double, - private var partitioner: Partitioner = null, private var seed: Long = System.nanoTime() ) extends Serializable with Logging { @@ -105,7 +105,7 @@ class ALS private ( * Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10, * lambda: 0.01, implicitPrefs: false, alpha: 1.0}. */ - def this() = this(-1, 10, 10, 0.01, false, 1.0, null) + def this() = this(-1, 10, 10, 0.01, false, 1.0) /** * Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured @@ -116,6 +116,14 @@ class ALS private ( this } + var partitioner: Partitioner = null + + /** Sets the Partitioner that partitions users. */ + def setPartitioner(p: Partitioner): ALS = { + this.partitioner = p + this + } + /** Set the rank of the feature matrices computed (number of features). Default: 10. */ def setRank(rank: Int): ALS = { this.rank = rank @@ -169,24 +177,23 @@ class ALS private ( this.numBlocks } - // Hash an integer to propagate random bits at all positions, similar to java.util.HashTable - def hash(x: Int): Int = { - val r = x ^ (x >>> 20) ^ (x >>> 12) - r ^ (r >>> 7) ^ (r >>> 4) - } - - this.partitioner = new Partitioner { - def numPartitions = numBlocks + val defaultPartitioner = new Partitioner { + val numPartitions = numBlocks def getPartition(x: Any): Int = x match { case null => 0 - case _ => Utils.nonNegativeMod(hash(x.hashCode), numPartitions) + case _ => Utils.nonNegativeMod(byteswap32(x.hashCode), numPartitions) } } - val ratingsByUserBlock = ratings.map{ rating => (partitioner.getPartition(rating.user), rating) } + if (partitioner == null) partitioner = defaultPartitioner + + val ratingsByUserBlock = ratings.map{ rating => + (partitioner.getPartition(rating.user), rating) + } val ratingsByProductBlock = ratings.map{ rating => - (partitioner.getPartition(rating.product), Rating(rating.product, rating.user, rating.rating)) + (partitioner.getPartition(rating.product), + Rating(rating.product, rating.user, rating.rating)) } val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock) @@ -198,13 +205,13 @@ class ALS private ( val seed1 = seedGen.nextInt() val seed2 = seedGen.nextInt() var users = userOutLinks.mapPartitionsWithIndex { (index, itr) => - val rand = new Random(hash(seed1 ^ index)) + val rand = new Random(byteswap32(seed1 ^ index)) itr.map { case (x, y) => (x, y.elementIds.map(_ => randomFactor(rank, rand))) } } var products = productOutLinks.mapPartitionsWithIndex { (index, itr) => - val rand = new Random(hash(seed2 ^ index)) + val rand = new Random(byteswap32(seed2 ^ index)) itr.map { case (x, y) => (x, y.elementIds.map(_ => randomFactor(rank, rand))) } @@ -531,6 +538,34 @@ class ALS private ( */ object ALS { + /** + * Train a matrix factorization model given an RDD of ratings given by users to some products, + * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the + * product of two lower-rank matrices of a given rank (number of features). To solve for these + * features, we run a given number of iterations of ALS. This is done using a level of + * parallelism given by `blocks`, partitioning the data using the Partitioner `partitioner`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param seed random seed + * @param partitioner Partitioner mapping users and products to partitions + */ + def train( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + seed: Long, + partitioner: Partitioner) = { + val als = new ALS(blocks, rank, iterations, lambda, false, 1.0, seed) + als.setPartitioner(partitioner) + als.run(ratings) + } + /** * Train a matrix factorization model given an RDD of ratings given by users to some products, * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the @@ -553,7 +588,7 @@ object ALS { blocks: Int, seed: Long ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, false, 1.0, null, seed).run(ratings) + new ALS(blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings) } /** @@ -576,7 +611,7 @@ object ALS { lambda: Double, blocks: Int ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, false, 1.0, null).run(ratings) + new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings) } /** @@ -612,6 +647,37 @@ object ALS { train(ratings, rank, iterations, 0.01, -1) } + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' given by users + * to some products, in the form of (userID, productID, preference) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. This is done using + * a level of parallelism given by `blocks`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param alpha confidence parameter (only applies when immplicitPrefs = true) + * @param seed random seed + * @param partitioner Partitioner for partitioning users and products + */ + def trainImplicit( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + alpha: Double, + seed: Long, + partitioner: Partitioner + ): MatrixFactorizationModel = { + new ALS(blocks, rank, iterations, lambda, true, alpha, seed) + .setPartitioner(partitioner) + .run(ratings) + } + /** * Train a matrix factorization model given an RDD of 'implicit preferences' given by users * to some products, in the form of (userID, productID, preference) pairs. We approximate the @@ -636,7 +702,7 @@ object ALS { alpha: Double, seed: Long ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, true, alpha, null, seed).run(ratings) + new ALS(blocks, rank, iterations, lambda, true, alpha, seed).run(ratings) } /** @@ -661,7 +727,7 @@ object ALS { blocks: Int, alpha: Double ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, true, alpha, null).run(ratings) + new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings) } /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 5aab9aba8f9c0..b346c25fb782c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -27,6 +27,7 @@ import org.jblas.DoubleMatrix import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.SparkContext._ +import org.apache.spark.Partitioner object ALSSuite { @@ -74,7 +75,6 @@ object ALSSuite { (sampledRatings, trueRatings, truePrefs) } - } @@ -128,6 +128,17 @@ class ALSSuite extends FunSuite with LocalSparkContext { assert(u11 != u2) } + test("custom partitioner") { + testALS(50, 50, 2, 15, 0.7, 0.3, false, false, false, 3, null) + testALS(50, 50, 2, 15, 0.7, 0.3, false, false, false, 3, new Partitioner { + def numPartitions(): Int = 3 + def getPartition(x: Any): Int = x match { + case null => 0 + case _ => x.hashCode % 2 + } + }) + } + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * @@ -140,16 +151,21 @@ class ALSSuite extends FunSuite with LocalSparkContext { * @param implicitPrefs flag to test implicit feedback * @param bulkPredict flag to test bulk prediciton * @param negativeWeights whether the generated data can contain negative values + * @param numBlocks number of blocks to partition users and products into + * @param partitioner partitioner */ def testALS(users: Int, products: Int, features: Int, iterations: Int, samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false, - bulkPredict: Boolean = false, negativeWeights: Boolean = false) + bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1, + partitioner: Partitioner = null) { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, features, samplingRate, implicitPrefs, negativeWeights) val model = implicitPrefs match { - case false => ALS.train(sc.parallelize(sampledRatings), features, iterations) - case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations) + case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01, + numBlocks, 0L, partitioner) + case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01, + numBlocks, 1.0, 0L, partitioner) } val predictedU = new DoubleMatrix(users, features) From d872b098d41c4fc088e579e8fe199aca149bca64 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Wed, 16 Apr 2014 08:19:48 -0400 Subject: [PATCH 04/23] Add negative id ALS test. --- .../spark/mllib/recommendation/ALSSuite.scala | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index b346c25fb782c..6ed1ebe1d1efa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -139,6 +139,36 @@ class ALSSuite extends FunSuite with LocalSparkContext { }) } + test("negative ids") { + val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false) + val ratings = data._1.toArray + val correct = data._2 + for (i <- 0 until ratings.length) { + var u = ratings(i).user + var p = ratings(i).product + var r = ratings(i).rating + u = u - 25 + p = p - 25 + ratings(i) = new Rating(u,p,r) + } + val ratingsRDD = sc.parallelize(ratings) + + val model = ALS.train(ratingsRDD, 5, 15) + + var pairs = new Array[(Int, Int)](0) + for (u <- -25 until 25; p <- -25 until 25) { + pairs = pairs :+ (u, p) + } + val ans = model.predict(sc.parallelize(pairs)).collect + for (r <- ans) { + val u = r.user + 25 + val p = r.product + 25 + val v = r.rating + val error = v - correct.get(u,p) + assert(math.abs(error) < 0.4) + } + } + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * From 36a0f43519a1e8ea800b960157f8c8b050139105 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Wed, 16 Apr 2014 12:42:31 -0400 Subject: [PATCH 05/23] Make the partitioner private. --- .../main/scala/org/apache/spark/mllib/recommendation/ALS.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 069a11b976b2b..15393375ad59c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -116,7 +116,7 @@ class ALS private ( this } - var partitioner: Partitioner = null + private var partitioner: Partitioner = null /** Sets the Partitioner that partitions users. */ def setPartitioner(p: Partitioner): ALS = { From 5ec9e6cd237c4ac7c1b597614c880ae75bacceee Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Wed, 16 Apr 2014 13:00:39 -0400 Subject: [PATCH 06/23] Clean a couple of things up using 'map'. --- .../spark/mllib/recommendation/ALSSuite.scala | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 6ed1ebe1d1efa..0111e68a01169 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -141,25 +141,12 @@ class ALSSuite extends FunSuite with LocalSparkContext { test("negative ids") { val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false) - val ratings = data._1.toArray + val ratings = sc.parallelize(data._1.map { case Rating(u,p,r) => Rating(u-25,p-25,r) }) val correct = data._2 - for (i <- 0 until ratings.length) { - var u = ratings(i).user - var p = ratings(i).product - var r = ratings(i).rating - u = u - 25 - p = p - 25 - ratings(i) = new Rating(u,p,r) - } - val ratingsRDD = sc.parallelize(ratings) - - val model = ALS.train(ratingsRDD, 5, 15) + val model = ALS.train(ratings, 5, 15) - var pairs = new Array[(Int, Int)](0) - for (u <- -25 until 25; p <- -25 until 25) { - pairs = pairs :+ (u, p) - } - val ans = model.predict(sc.parallelize(pairs)).collect + val pairs = ratings.map { case Rating(u,p,r) => (u,p) } + val ans = model.predict(pairs).collect for (r <- ans) { val u = r.user + 25 val p = r.product + 25 From f8413451c807282100a9be506ca2c992abb81918 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Wed, 16 Apr 2014 13:12:47 -0400 Subject: [PATCH 07/23] Fix daft bug creating 'pairs', also for -> foreach. --- .../org/apache/spark/mllib/recommendation/ALSSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 0111e68a01169..3298781667cf3 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -145,9 +145,9 @@ class ALSSuite extends FunSuite with LocalSparkContext { val correct = data._2 val model = ALS.train(ratings, 5, 15) - val pairs = ratings.map { case Rating(u,p,r) => (u,p) } - val ans = model.predict(pairs).collect - for (r <- ans) { + val pairs = Array.tabulate(50, 50)((u,p) => (u-25,p-25)).flatten + val ans = model.predict(sc.parallelize(pairs)).collect + ans.foreach { r => val u = r.user + 25 val p = r.product + 25 val v = r.rating From 40edc235e59aab56d6f65c73ffe98859c78a889b Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Wed, 16 Apr 2014 14:14:38 -0400 Subject: [PATCH 08/23] Fix missing space. --- .../scala/org/apache/spark/mllib/recommendation/ALSSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 3298781667cf3..0b0d6f83d16f9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -151,7 +151,7 @@ class ALSSuite extends FunSuite with LocalSparkContext { val u = r.user + 25 val p = r.product + 25 val v = r.rating - val error = v - correct.get(u,p) + val error = v - correct.get(u, p) assert(math.abs(error) < 0.4) } } From 674933abb7a373dc1c913467d668bad9045e560f Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sat, 19 Apr 2014 19:36:52 -0400 Subject: [PATCH 09/23] Fix style. --- .../org/apache/spark/mllib/recommendation/ALS.scala | 7 +++---- .../spark/mllib/recommendation/ALSSuite.scala | 13 ++++++------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 15393375ad59c..3af6289cbe23f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -118,7 +118,7 @@ class ALS private ( private var partitioner: Partitioner = null - /** Sets the Partitioner that partitions users. */ + /** Sets the Partitioner that partitions users and products. */ def setPartitioner(p: Partitioner): ALS = { this.partitioner = p this @@ -180,9 +180,8 @@ class ALS private ( val defaultPartitioner = new Partitioner { val numPartitions = numBlocks - def getPartition(x: Any): Int = x match { - case null => 0 - case _ => Utils.nonNegativeMod(byteswap32(x.hashCode), numPartitions) + def getPartition(x: Any): Int = { + Utils.nonNegativeMod(byteswap32(x.asInstanceOf[Int]), numPartitions) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 0b0d6f83d16f9..7e2a0ace3d41e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -132,21 +132,20 @@ class ALSSuite extends FunSuite with LocalSparkContext { testALS(50, 50, 2, 15, 0.7, 0.3, false, false, false, 3, null) testALS(50, 50, 2, 15, 0.7, 0.3, false, false, false, 3, new Partitioner { def numPartitions(): Int = 3 - def getPartition(x: Any): Int = x match { - case null => 0 - case _ => x.hashCode % 2 - } + def getPartition(x: Any): Int = x.asInstanceOf[Int] % 2 }) } test("negative ids") { val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false) - val ratings = sc.parallelize(data._1.map { case Rating(u,p,r) => Rating(u-25,p-25,r) }) + val ratings = sc.parallelize(data._1.map { case Rating(u, p, r) => + Rating(u - 25, p - 25, r) + }) val correct = data._2 val model = ALS.train(ratings, 5, 15) - val pairs = Array.tabulate(50, 50)((u,p) => (u-25,p-25)).flatten - val ans = model.predict(sc.parallelize(pairs)).collect + val pairs = Array.tabulate(50, 50)((u, p) => (u - 25, p - 25)).flatten + val ans = model.predict(sc.parallelize(pairs)).collect() ans.foreach { r => val u = r.user + 25 val p = r.product + 25 From 23d6f91b52c88b7006ec78496f777b72b1881bb4 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sun, 20 Apr 2014 20:06:19 -0400 Subject: [PATCH 10/23] Stop making the partitioner configurable. --- .../spark/mllib/recommendation/ALS.scala | 69 +------------------ .../spark/mllib/recommendation/ALSSuite.scala | 16 +---- 2 files changed, 4 insertions(+), 81 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index c93b21221ba34..29c437d90f77e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -120,12 +120,6 @@ class ALS private ( private var partitioner: Partitioner = null - /** Sets the Partitioner that partitions users and products. */ - def setPartitioner(p: Partitioner): ALS = { - this.partitioner = p - this - } - /** Set the rank of the feature matrices computed (number of features). Default: 10. */ def setRank(rank: Int): ALS = { this.rank = rank @@ -179,7 +173,7 @@ class ALS private ( this.numBlocks } - val defaultPartitioner = new Partitioner { + partitioner = new Partitioner { val numPartitions = numBlocks def getPartition(x: Any): Int = { @@ -187,8 +181,6 @@ class ALS private ( } } - if (partitioner == null) partitioner = defaultPartitioner - val ratingsByUserBlock = ratings.map{ rating => (partitioner.getPartition(rating.user), rating) } @@ -548,34 +540,6 @@ class ALS private ( */ object ALS { - /** - * Train a matrix factorization model given an RDD of ratings given by users to some products, - * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the - * product of two lower-rank matrices of a given rank (number of features). To solve for these - * features, we run a given number of iterations of ALS. This is done using a level of - * parallelism given by `blocks`, partitioning the data using the Partitioner `partitioner`. - * - * @param ratings RDD of (userID, productID, rating) pairs - * @param rank number of features to use - * @param iterations number of iterations of ALS (recommended: 10-20) - * @param lambda regularization factor (recommended: 0.01) - * @param blocks level of parallelism to split computation into - * @param seed random seed - * @param partitioner Partitioner mapping users and products to partitions - */ - def train( - ratings: RDD[Rating], - rank: Int, - iterations: Int, - lambda: Double, - blocks: Int, - seed: Long, - partitioner: Partitioner) = { - val als = new ALS(blocks, rank, iterations, lambda, false, 1.0, seed) - als.setPartitioner(partitioner) - als.run(ratings) - } - /** * Train a matrix factorization model given an RDD of ratings given by users to some products, * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the @@ -657,37 +621,6 @@ object ALS { train(ratings, rank, iterations, 0.01, -1) } - /** - * Train a matrix factorization model given an RDD of 'implicit preferences' given by users - * to some products, in the form of (userID, productID, preference) pairs. We approximate the - * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). - * To solve for these features, we run a given number of iterations of ALS. This is done using - * a level of parallelism given by `blocks`. - * - * @param ratings RDD of (userID, productID, rating) pairs - * @param rank number of features to use - * @param iterations number of iterations of ALS (recommended: 10-20) - * @param lambda regularization factor (recommended: 0.01) - * @param blocks level of parallelism to split computation into - * @param alpha confidence parameter (only applies when immplicitPrefs = true) - * @param seed random seed - * @param partitioner Partitioner for partitioning users and products - */ - def trainImplicit( - ratings: RDD[Rating], - rank: Int, - iterations: Int, - lambda: Double, - blocks: Int, - alpha: Double, - seed: Long, - partitioner: Partitioner - ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, true, alpha, seed) - .setPartitioner(partitioner) - .run(ratings) - } - /** * Train a matrix factorization model given an RDD of 'implicit preferences' given by users * to some products, in the form of (userID, productID, preference) pairs. We approximate the diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 7e2a0ace3d41e..4dfcd4b52ec66 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -128,14 +128,6 @@ class ALSSuite extends FunSuite with LocalSparkContext { assert(u11 != u2) } - test("custom partitioner") { - testALS(50, 50, 2, 15, 0.7, 0.3, false, false, false, 3, null) - testALS(50, 50, 2, 15, 0.7, 0.3, false, false, false, 3, new Partitioner { - def numPartitions(): Int = 3 - def getPartition(x: Any): Int = x.asInstanceOf[Int] % 2 - }) - } - test("negative ids") { val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false) val ratings = sc.parallelize(data._1.map { case Rating(u, p, r) => @@ -168,20 +160,18 @@ class ALSSuite extends FunSuite with LocalSparkContext { * @param bulkPredict flag to test bulk prediciton * @param negativeWeights whether the generated data can contain negative values * @param numBlocks number of blocks to partition users and products into - * @param partitioner partitioner */ def testALS(users: Int, products: Int, features: Int, iterations: Int, samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false, - bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1, - partitioner: Partitioner = null) + bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1) { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, features, samplingRate, implicitPrefs, negativeWeights) val model = implicitPrefs match { case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01, - numBlocks, 0L, partitioner) + numBlocks, 0L) case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01, - numBlocks, 1.0, 0L, partitioner) + numBlocks, 1.0, 0L) } val predictedU = new DoubleMatrix(users, features) From dcf583ac4001c6da8d6b85e45e88043861a351d8 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 21 Apr 2014 15:56:48 -0400 Subject: [PATCH 11/23] Remove the partitioner member variable; instead, thread that needle everywhere it needs to go. --- .../spark/mllib/recommendation/ALS.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 29c437d90f77e..60fb73f2b5be5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -118,8 +118,6 @@ class ALS private ( this } - private var partitioner: Partitioner = null - /** Set the rank of the feature matrices computed (number of features). Default: 10. */ def setRank(rank: Int): ALS = { this.rank = rank @@ -173,7 +171,7 @@ class ALS private ( this.numBlocks } - partitioner = new Partitioner { + val partitioner = new Partitioner { val numPartitions = numBlocks def getPartition(x: Any): Int = { @@ -189,8 +187,9 @@ class ALS private ( Rating(rating.product, rating.user, rating.rating)) } - val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock) - val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock) + val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner) + val (productInLinks, productOutLinks) = + makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner) // Initialize user and product factors randomly, but use a deterministic seed for each // partition so that fault recovery works @@ -335,7 +334,8 @@ class ALS private ( * Make the out-links table for a block of the users (or products) dataset given the list of * (user, product, rating) values for the users in that block (or the opposite for products). */ - private def makeOutLinkBlock(numBlocks: Int, ratings: Array[Rating]): OutLinkBlock = { + private def makeOutLinkBlock(numBlocks: Int, ratings: Array[Rating], + partitioner: Partitioner): OutLinkBlock = { val userIds = ratings.map(_.user).distinct.sorted val numUsers = userIds.length val userIdToPos = userIds.zipWithIndex.toMap @@ -350,7 +350,8 @@ class ALS private ( * Make the in-links table for a block of the users (or products) dataset given a list of * (user, product, rating) values for the users in that block (or the opposite for products). */ - private def makeInLinkBlock(numBlocks: Int, ratings: Array[Rating]): InLinkBlock = { + private def makeInLinkBlock(numBlocks: Int, ratings: Array[Rating], + partitioner: Partitioner): InLinkBlock = { val userIds = ratings.map(_.user).distinct.sorted val numUsers = userIds.length val userIdToPos = userIds.zipWithIndex.toMap @@ -382,14 +383,14 @@ class ALS private ( * the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid * having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it. */ - private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)]) + private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)], partitioner: Partitioner) : (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) = { val grouped = ratings.partitionBy(new HashPartitioner(numBlocks)) val links = grouped.mapPartitionsWithIndex((blockId, elements) => { val ratings = elements.map{_._2}.toArray - val inLinkBlock = makeInLinkBlock(numBlocks, ratings) - val outLinkBlock = makeOutLinkBlock(numBlocks, ratings) + val inLinkBlock = makeInLinkBlock(numBlocks, ratings, partitioner) + val outLinkBlock = makeOutLinkBlock(numBlocks, ratings, partitioner) Iterator.single((blockId, (inLinkBlock, outLinkBlock))) }, true) val inLinks = links.mapValues(_._1) From 657a71b143103f9b37ed31976a2f4346bdbe4e7c Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Tue, 22 Apr 2014 18:45:25 -0400 Subject: [PATCH 12/23] Simple-minded estimates of computation and communication costs in ALS. --- .../spark/mllib/recommendation/ALS.scala | 52 ++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 60fb73f2b5be5..4ab6ca37a0d8b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.recommendation -import scala.collection.mutable.{ArrayBuffer, BitSet} +import scala.collection.mutable.{ArrayBuffer, BitSet, HashSet} import scala.math.{abs, sqrt} import scala.util.Random import scala.util.Sorting @@ -708,6 +708,56 @@ object ALS { trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) } + /** + * Given an RDD of ratings, a rank, and two partitioners, compute rough estimates of the + * computation time and communication cost of one iteration of ALS. + * + * @param ratings RDD of Rating objects + * @param rank number of features to use + * @param userPartitioner partitioner for partitioning users + * @param productPartitioner partitioner for partitioning products + */ + def evaluatePartitioner(ratings: RDD[Rating], rank: Int, userPartitioner: Partitioner, + productPartitioner: Partitioner): (Double, Double) = { + val utalk = ratings.mapPartitions(x => { + val ht = new HashSet[(Int, Int)]() + while (x.hasNext) { + val rat = x.next() + val u = userPartitioner.getPartition(rat.user) + val p = rat.product + ht += ((u, p)) + } + ht.iterator + } + ).groupByKey().map(x => (x._1, x._2.toList.distinct.length)).collect + + val ptalk = ratings.mapPartitions(x => { + val ht = new HashSet[(Int, Int)]() + while (x.hasNext) { + val rat = x.next() + val u = rat.user + val p = productPartitioner.getPartition(rat.product) + ht += ((p, u)) + } + ht.iterator + } + ).groupByKey().map(x => (x._1, x._2.toList.distinct.length)).collect + + val numUsers = ratings.map(x => x.user).distinct.count + val numProducts = ratings.map(x => x.product).distinct.count + + // We send out each user vector to each product partition that needs it and vice versa. + val ucomm = utalk.map(x => x._2).reduce(_ + _) + val pcomm = ptalk.map(x => x._2).reduce(_ + _) + val communication = 8.0 * rank * (ucomm + pcomm) + + // We do two rank*rank outer products per rating and one Cholesky factorisation per user and + // per product. + val computation = (2.0 * rank * rank * ratings.count + + rank * rank * rank * (numUsers + numProducts) / 6.0) + (computation, communication) + } + private class ALSRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Rating]) From a1184d123516fb7165f87e373ee4f70b65a1481a Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Tue, 22 Apr 2014 21:37:15 -0400 Subject: [PATCH 13/23] Mark ALS.evaluatePartitioner DeveloperApi. --- .../scala/org/apache/spark/mllib/recommendation/ALS.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 4ab6ca37a0d8b..e70fb66a3516b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -709,6 +709,7 @@ object ALS { } /** + * :: DeveloperApi :: * Given an RDD of ratings, a rank, and two partitioners, compute rough estimates of the * computation time and communication cost of one iteration of ALS. * @@ -716,7 +717,8 @@ object ALS { * @param rank number of features to use * @param userPartitioner partitioner for partitioning users * @param productPartitioner partitioner for partitioning products - */ + * / + @DeveloperApi def evaluatePartitioner(ratings: RDD[Rating], rank: Int, userPartitioner: Partitioner, productPartitioner: Partitioner): (Double, Double) = { val utalk = ratings.mapPartitions(x => { @@ -755,6 +757,7 @@ object ALS { // per product. val computation = (2.0 * rank * rank * ratings.count + rank * rank * rank * (numUsers + numProducts) / 6.0) + (computation, communication) } From 6c31324a96d716949ab57fe2e9773476f6caa07a Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Tue, 22 Apr 2014 21:38:25 -0400 Subject: [PATCH 14/23] Make it actually build... --- .../scala/org/apache/spark/mllib/recommendation/ALS.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index e70fb66a3516b..81bd5e7cad44e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -26,7 +26,7 @@ import scala.util.hashing.byteswap32 import com.esotericsoftware.kryo.Kryo import org.jblas.{DoubleMatrix, SimpleBlas, Solve} -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf} import org.apache.spark.storage.StorageLevel @@ -717,7 +717,7 @@ object ALS { * @param rank number of features to use * @param userPartitioner partitioner for partitioning users * @param productPartitioner partitioner for partitioning products - * / + */ @DeveloperApi def evaluatePartitioner(ratings: RDD[Rating], rank: Int, userPartitioner: Partitioner, productPartitioner: Partitioner): (Double, Double) = { From 6615ed56f3c6109c22f151220a52080182285039 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Wed, 23 Apr 2014 15:00:46 -0400 Subject: [PATCH 15/23] It's more useful to give per-partition estimates. Do that. --- .../spark/mllib/recommendation/ALS.scala | 55 ++++++++++++++----- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 81bd5e7cad44e..66c0547685f21 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.recommendation -import scala.collection.mutable.{ArrayBuffer, BitSet, HashSet} +import scala.collection.mutable.{ArrayBuffer, BitSet, HashSet, HashMap} import scala.math.{abs, sqrt} import scala.util.Random import scala.util.Sorting @@ -711,7 +711,11 @@ object ALS { /** * :: DeveloperApi :: * Given an RDD of ratings, a rank, and two partitioners, compute rough estimates of the - * computation time and communication cost of one iteration of ALS. + * computation time and communication cost of one iteration of ALS. Returns a pair of pairs of + * Maps. The first pair of maps represents computation time in unspecified units. The second + * pair of maps represents communication cost in uncompressed bytes. The first element of each + * pair is the cost attributable to user partitioning, while the second is the cost attributable + * to product partitioning. * * @param ratings RDD of Rating objects * @param rank number of features to use @@ -720,7 +724,8 @@ object ALS { */ @DeveloperApi def evaluatePartitioner(ratings: RDD[Rating], rank: Int, userPartitioner: Partitioner, - productPartitioner: Partitioner): (Double, Double) = { + productPartitioner: Partitioner): + ((Map[Int, Double], Map[Int, Double]), (Map[Int, Double], Map[Int, Double])) = { val utalk = ratings.mapPartitions(x => { val ht = new HashSet[(Int, Int)]() while (x.hasNext) { @@ -731,7 +736,7 @@ object ALS { } ht.iterator } - ).groupByKey().map(x => (x._1, x._2.toList.distinct.length)).collect + ).groupByKey().map(x => (x._1, x._2.toList.distinct.length)).collectAsMap() val ptalk = ratings.mapPartitions(x => { val ht = new HashSet[(Int, Int)]() @@ -743,22 +748,44 @@ object ALS { } ht.iterator } - ).groupByKey().map(x => (x._1, x._2.toList.distinct.length)).collect - - val numUsers = ratings.map(x => x.user).distinct.count - val numProducts = ratings.map(x => x.product).distinct.count + ).groupByKey().map(x => (x._1, x._2.toList.distinct.length)).collectAsMap() // We send out each user vector to each product partition that needs it and vice versa. - val ucomm = utalk.map(x => x._2).reduce(_ + _) - val pcomm = ptalk.map(x => x._2).reduce(_ + _) - val communication = 8.0 * rank * (ucomm + pcomm) + val communication = (utalk.map(x => (x._1, 8.0 * rank * x._2)).toMap, + ptalk.map(x => (x._1, 8.0 * rank * x._2)).toMap) + + // (user, #ratings) + val users = ratings.map(x => x.user).groupBy(x => x).mapValues(x => x.toList.length) + val products = ratings.map(x => x.product).groupBy(x => x).mapValues(x => x.toList.length) + + // (upart, #ratings) + val userRatings = (users.map(x => userPartitioner.getPartition(x._1)).groupBy(x => x) + .mapValues(x => x.toList.length).collectAsMap()) + val productRatings = (products.map(x => productPartitioner.getPartition(x._1)).groupBy(x => x) + .mapValues(x => x.toList.length).collectAsMap()) + + // (upart, #users) + val userCount = (users.map(x => x._1).distinct.map(x => userPartitioner.getPartition(x)) + .groupBy(x => x).mapValues(x => x.toList.length).collectAsMap()) + val productCount = (products.map(x => x._1).distinct.map(x => + productPartitioner.getPartition(x)).groupBy(x => x).mapValues(x => x.toList.length) + .collectAsMap()) + + val userComputation = new HashMap[Int, Double]() + val productComputation = new HashMap[Int, Double]() + userCount.keys.foreach { k => + userComputation.put(k, 1.0 * rank * rank * userRatings.getOrElse(k, 0) + + rank * rank * rank * userCount.getOrElse(k, 0) / 6.0) + } + productCount.keys.foreach { k => + productComputation.put(k, 1.0 * rank * rank * productRatings.getOrElse(k, 0) + + rank * rank * rank * productCount.getOrElse(k, 0) / 6.0) + } // We do two rank*rank outer products per rating and one Cholesky factorisation per user and // per product. - val computation = (2.0 * rank * rank * ratings.count - + rank * rank * rank * (numUsers + numProducts) / 6.0) - (computation, communication) + ((userComputation.toMap, productComputation.toMap), communication) } private class ALSRegistrator extends KryoRegistrator { From 8cbebf1037f5e0d0035b26c07db3a5e6d77ee08c Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Fri, 25 Apr 2014 19:17:30 -0400 Subject: [PATCH 16/23] Rename and clean up the return format of cost estimator. --- .../spark/mllib/recommendation/ALS.scala | 62 +++++++++++++++---- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 66c0547685f21..2e601dc6f5bc2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -17,7 +17,8 @@ package org.apache.spark.mllib.recommendation -import scala.collection.mutable.{ArrayBuffer, BitSet, HashSet, HashMap} +import scala.collection.mutable.{ArrayBuffer, BitSet} +import scala.collection.mutable import scala.math.{abs, sqrt} import scala.util.Random import scala.util.Sorting @@ -708,6 +709,9 @@ object ALS { trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) } + @DeveloperApi + case class IterationCost(inboundBytes: Double, computation: Double, outboundBytes: Double) + /** * :: DeveloperApi :: * Given an RDD of ratings, a rank, and two partitioners, compute rough estimates of the @@ -723,11 +727,12 @@ object ALS { * @param productPartitioner partitioner for partitioning products */ @DeveloperApi - def evaluatePartitioner(ratings: RDD[Rating], rank: Int, userPartitioner: Partitioner, + def estimateCost(ratings: RDD[Rating], rank: Int, userPartitioner: Partitioner, productPartitioner: Partitioner): - ((Map[Int, Double], Map[Int, Double]), (Map[Int, Double], Map[Int, Double])) = { + (Map[Int, IterationCost], Map[Int, IterationCost]) = { + // user partition -> set of products val utalk = ratings.mapPartitions(x => { - val ht = new HashSet[(Int, Int)]() + val ht = new mutable.HashSet[(Int, Int)]() while (x.hasNext) { val rat = x.next() val u = userPartitioner.getPartition(rat.user) @@ -736,10 +741,22 @@ object ALS { } ht.iterator } - ).groupByKey().map(x => (x._1, x._2.toList.distinct.length)).collectAsMap() + ) + + utalk.persist() + + // user partition -> number of products + val userInbound = utalk.groupByKey.map(x => (x._1, 8.0 * rank * x._2.toList.distinct.length)).collectAsMap() + // product -> number of user partitions, summed over each partition. + val productOutbound = (utalk.distinct.map(x => (productPartitioner.getPartition(x._2), x._1)) + .groupByKey.mapValues(x => 8.0 * rank * x.toList.length).collectAsMap()) + + utalk.unpersist() + + // product partition -> set of users val ptalk = ratings.mapPartitions(x => { - val ht = new HashSet[(Int, Int)]() + val ht = new mutable.HashSet[(Int, Int)]() while (x.hasNext) { val rat = x.next() val u = rat.user @@ -748,11 +765,18 @@ object ALS { } ht.iterator } - ).groupByKey().map(x => (x._1, x._2.toList.distinct.length)).collectAsMap() + ) + + ptalk.persist() + + // product partition -> number of users + val productInbound = ptalk.groupByKey.map(x => (x._1, 8.0 * rank * x._2.toList.distinct.length)).collectAsMap() - // We send out each user vector to each product partition that needs it and vice versa. - val communication = (utalk.map(x => (x._1, 8.0 * rank * x._2)).toMap, - ptalk.map(x => (x._1, 8.0 * rank * x._2)).toMap) + // user -> number of product partitions, summed over each partition. + val userOutbound = (ptalk.distinct.map(x => (userPartitioner.getPartition(x._2), x._1)) + .groupByKey.mapValues(x => 8.0 * rank * x.toList.length).collectAsMap()) + + ptalk.unpersist() // (user, #ratings) val users = ratings.map(x => x.user).groupBy(x => x).mapValues(x => x.toList.length) @@ -771,8 +795,8 @@ object ALS { productPartitioner.getPartition(x)).groupBy(x => x).mapValues(x => x.toList.length) .collectAsMap()) - val userComputation = new HashMap[Int, Double]() - val productComputation = new HashMap[Int, Double]() + val userComputation = new mutable.HashMap[Int, Double]() + val productComputation = new mutable.HashMap[Int, Double]() userCount.keys.foreach { k => userComputation.put(k, 1.0 * rank * rank * userRatings.getOrElse(k, 0) + rank * rank * rank * userCount.getOrElse(k, 0) / 6.0) @@ -782,10 +806,22 @@ object ALS { + rank * rank * rank * productCount.getOrElse(k, 0) / 6.0) } + val userAnswer = new mutable.HashMap[Int, IterationCost]() + userCount.keys.foreach { k => + userAnswer.put(k, IterationCost(userInbound.getOrElse(k, 0.0), + userComputation.getOrElse(k, 0.0), userOutbound.getOrElse(k, 0.0))) + } + + val productAnswer = new mutable.HashMap[Int, IterationCost]() + productCount.keys.foreach { k => + productAnswer.put(k, IterationCost(productInbound.getOrElse(k, 0.0), + productComputation.getOrElse(k, 0.0), productOutbound.getOrElse(k, 0.0))) + } + // We do two rank*rank outer products per rating and one Cholesky factorisation per user and // per product. - ((userComputation.toMap, productComputation.toMap), communication) + (userAnswer.toMap, productAnswer.toMap) } private class ALSRegistrator extends KryoRegistrator { From 8b21e6d98bc4d7b32d8a32cd191d8846b1268106 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sat, 26 Apr 2014 11:15:08 -0400 Subject: [PATCH 17/23] Fix overlong lines. --- .../scala/org/apache/spark/mllib/recommendation/ALS.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 2e601dc6f5bc2..218c779505ee6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -746,7 +746,8 @@ object ALS { utalk.persist() // user partition -> number of products - val userInbound = utalk.groupByKey.map(x => (x._1, 8.0 * rank * x._2.toList.distinct.length)).collectAsMap() + val userInbound = + utalk.groupByKey.map(x => (x._1, 8.0 * rank * x._2.toList.distinct.length)).collectAsMap() // product -> number of user partitions, summed over each partition. val productOutbound = (utalk.distinct.map(x => (productPartitioner.getPartition(x._2), x._1)) @@ -770,7 +771,8 @@ object ALS { ptalk.persist() // product partition -> number of users - val productInbound = ptalk.groupByKey.map(x => (x._1, 8.0 * rank * x._2.toList.distinct.length)).collectAsMap() + val productInbound = + ptalk.groupByKey.map(x => (x._1, 8.0 * rank * x._2.toList.distinct.length)).collectAsMap() // user -> number of product partitions, summed over each partition. val userOutbound = (ptalk.distinct.map(x => (userPartitioner.getPartition(x._2), x._1)) From 2ab7a5d0d5c46def977ad2163a630f4e107659d5 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Tue, 29 Apr 2014 02:12:23 -0400 Subject: [PATCH 18/23] Reindent estimateCost's declaration and make it return Seqs. --- .../org/apache/spark/mllib/recommendation/ALS.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 218c779505ee6..f1b1174391868 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -727,9 +727,12 @@ object ALS { * @param productPartitioner partitioner for partitioning products */ @DeveloperApi - def estimateCost(ratings: RDD[Rating], rank: Int, userPartitioner: Partitioner, - productPartitioner: Partitioner): - (Map[Int, IterationCost], Map[Int, IterationCost]) = { + def estimateCost( + ratings: RDD[Rating], + rank: Int, + userPartitioner: Partitioner, + productPartitioner: Partitioner + ): (Seq[IterationCost], Seq[IterationCost]) = { // user partition -> set of products val utalk = ratings.mapPartitions(x => { val ht = new mutable.HashSet[(Int, Int)]() @@ -823,7 +826,8 @@ object ALS { // We do two rank*rank outer products per rating and one Cholesky factorisation per user and // per product. - (userAnswer.toMap, productAnswer.toMap) + ( userAnswer.toArray.sortBy(x => x._1).map(x => x._2), + productAnswer.toArray.sortBy(x => x._1).map(x => x._2)) } private class ALSRegistrator extends KryoRegistrator { From 2b2febe93a250e780235ccfd396001c3273fb4d0 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Wed, 30 Apr 2014 23:51:14 -0400 Subject: [PATCH 19/23] Use `makeLinkRDDs` when estimating costs. --- .../spark/mllib/recommendation/ALS.scala | 170 ++++++++---------- 1 file changed, 78 insertions(+), 92 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index f1b1174391868..8a9e0df673164 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -709,125 +709,111 @@ object ALS { trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) } + /** + * :: DeveloperApi :: + * Represents the cost attributable to a partition of a single ALS iteration. + */ @DeveloperApi - case class IterationCost(inboundBytes: Double, computation: Double, outboundBytes: Double) + case class IterationCost(inboundVectors: Double, outerProducts: Double, choleskies: Double, + outboundVectors: Double) /** * :: DeveloperApi :: * Given an RDD of ratings, a rank, and two partitioners, compute rough estimates of the - * computation time and communication cost of one iteration of ALS. Returns a pair of pairs of - * Maps. The first pair of maps represents computation time in unspecified units. The second - * pair of maps represents communication cost in uncompressed bytes. The first element of each - * pair is the cost attributable to user partitioning, while the second is the cost attributable - * to product partitioning. + * computation time and communication cost of one iteration of ALS. Returns a pair of + * `Seq[IterationCost]`s. The first `Seq` represents user partitions and the second `Seq` + * represents product partitions. These `Seq`s are indexed by partition numbers, and the `i`th + * member contains details for the `i`th partition. * * @param ratings RDD of Rating objects - * @param rank number of features to use * @param userPartitioner partitioner for partitioning users * @param productPartitioner partitioner for partitioning products */ @DeveloperApi def estimateCost( ratings: RDD[Rating], - rank: Int, userPartitioner: Partitioner, productPartitioner: Partitioner ): (Seq[IterationCost], Seq[IterationCost]) = { - // user partition -> set of products - val utalk = ratings.mapPartitions(x => { - val ht = new mutable.HashSet[(Int, Int)]() - while (x.hasNext) { - val rat = x.next() - val u = userPartitioner.getPartition(rat.user) - val p = rat.product - ht += ((u, p)) - } - ht.iterator - } - ) - - utalk.persist() + val numUserPartitions = userPartitioner.numPartitions + val numProdPartitions = productPartitioner.numPartitions - // user partition -> number of products - val userInbound = - utalk.groupByKey.map(x => (x._1, 8.0 * rank * x._2.toList.distinct.length)).collectAsMap() - - // product -> number of user partitions, summed over each partition. - val productOutbound = (utalk.distinct.map(x => (productPartitioner.getPartition(x._2), x._1)) - .groupByKey.mapValues(x => 8.0 * rank * x.toList.length).collectAsMap()) - - utalk.unpersist() + val ratingsByUserBlock = ratings.map{ rating => + (userPartitioner.getPartition(rating.user), rating) + } + val ratingsByProductBlock = ratings.map{ rating => + (productPartitioner.getPartition(rating.product), + Rating(rating.product, rating.user, rating.rating)) + } - // product partition -> set of users - val ptalk = ratings.mapPartitions(x => { - val ht = new mutable.HashSet[(Int, Int)]() - while (x.hasNext) { - val rat = x.next() - val u = rat.user - val p = productPartitioner.getPartition(rat.product) - ht += ((p, u)) + val als = new ALS() + val (userIn, userOut) = als.makeLinkRDDs(userPartitioner.numPartitions, + ratingsByUserBlock, userPartitioner) + val (prodIn, prodOut) = als.makeLinkRDDs(productPartitioner.numPartitions, + ratingsByProductBlock, productPartitioner) + + def sendGrid(outLinks: RDD[(Int, OutLinkBlock)]): Map[(Int, Int), Double] = { + outLinks.map{ x => + val grid = new mutable.HashMap[(Int, Int), Double]() + val uPartition = x._1 + x._2.shouldSend.foreach{ ss => + ss.foreach{ pPartition => + val pair = (uPartition, pPartition) + grid.put(pair, grid.getOrElse(pair, 0.0) + 1.0) + } } - ht.iterator - } - ) - - ptalk.persist() - - // product partition -> number of users - val productInbound = - ptalk.groupByKey.map(x => (x._1, 8.0 * rank * x._2.toList.distinct.length)).collectAsMap() - - // user -> number of product partitions, summed over each partition. - val userOutbound = (ptalk.distinct.map(x => (userPartitioner.getPartition(x._2), x._1)) - .groupByKey.mapValues(x => 8.0 * rank * x.toList.length).collectAsMap()) - - ptalk.unpersist() - - // (user, #ratings) - val users = ratings.map(x => x.user).groupBy(x => x).mapValues(x => x.toList.length) - val products = ratings.map(x => x.product).groupBy(x => x).mapValues(x => x.toList.length) - - // (upart, #ratings) - val userRatings = (users.map(x => userPartitioner.getPartition(x._1)).groupBy(x => x) - .mapValues(x => x.toList.length).collectAsMap()) - val productRatings = (products.map(x => productPartitioner.getPartition(x._1)).groupBy(x => x) - .mapValues(x => x.toList.length).collectAsMap()) - - // (upart, #users) - val userCount = (users.map(x => x._1).distinct.map(x => userPartitioner.getPartition(x)) - .groupBy(x => x).mapValues(x => x.toList.length).collectAsMap()) - val productCount = (products.map(x => x._1).distinct.map(x => - productPartitioner.getPartition(x)).groupBy(x => x).mapValues(x => x.toList.length) - .collectAsMap()) - - val userComputation = new mutable.HashMap[Int, Double]() - val productComputation = new mutable.HashMap[Int, Double]() - userCount.keys.foreach { k => - userComputation.put(k, 1.0 * rank * rank * userRatings.getOrElse(k, 0) - + rank * rank * rank * userCount.getOrElse(k, 0) / 6.0) + grid + }.reduce{ (grid1, grid2) => + grid2.foreach{ x => + grid1.put(x._1, grid1.getOrElse(x._1, 0.0) + x._2) + } + grid1 + }.toMap } - productCount.keys.foreach { k => - productComputation.put(k, 1.0 * rank * rank * productRatings.getOrElse(k, 0) - + rank * rank * rank * productCount.getOrElse(k, 0) / 6.0) + + def countRatings(inLinks: RDD[(Int, InLinkBlock)]): Map[Int, Double] = { + inLinks.mapValues{ ilb => + var numRatings = 0.0 + ilb.ratingsForBlock.foreach{ ar => + ar.foreach{ p => numRatings += p._1.length } + } + numRatings + }.collectAsMap().toMap } - val userAnswer = new mutable.HashMap[Int, IterationCost]() - userCount.keys.foreach { k => - userAnswer.put(k, IterationCost(userInbound.getOrElse(k, 0.0), - userComputation.getOrElse(k, 0.0), userOutbound.getOrElse(k, 0.0))) + val userSendGrid = sendGrid(userOut) + val prodSendGrid = sendGrid(prodOut) + + val userInbound = new Array[Double](numUserPartitions) + val prodInbound = new Array[Double](numProdPartitions) + val userOutbound = new Array[Double](numUserPartitions) + val prodOutbound = new Array[Double](numProdPartitions) + + for (u <- 0 until numUserPartitions; p <- 0 until numProdPartitions) { + userOutbound(u) += userSendGrid.getOrElse((u, p), 0.0) + prodInbound(p) += userSendGrid.getOrElse((u, p), 0.0) + userInbound(u) += prodSendGrid.getOrElse((p, u), 0.0) + prodOutbound(p) += prodSendGrid.getOrElse((p, u), 0.0) } - val productAnswer = new mutable.HashMap[Int, IterationCost]() - productCount.keys.foreach { k => - productAnswer.put(k, IterationCost(productInbound.getOrElse(k, 0.0), - productComputation.getOrElse(k, 0.0), productOutbound.getOrElse(k, 0.0))) + val userCounts = userOut.mapValues(x => x.elementIds.length).collectAsMap + val prodCounts = prodOut.mapValues(x => x.elementIds.length).collectAsMap + + val userRatings = countRatings(userIn) + val prodRatings = countRatings(prodIn) + + val userCosts = new Array[IterationCost](numUserPartitions) + val prodCosts = new Array[IterationCost](numProdPartitions) + + for (u <- 0 until numUserPartitions) { + userCosts(u) = IterationCost(userInbound(u), userRatings(u), userCounts(u), userOutbound(u)) } - // We do two rank*rank outer products per rating and one Cholesky factorisation per user and - // per product. + for (p <- 0 until numProdPartitions) { + prodCosts(p) = IterationCost(prodInbound(p), prodRatings(p), prodCounts(p), prodOutbound(p)) + } - ( userAnswer.toArray.sortBy(x => x._1).map(x => x._2), - productAnswer.toArray.sortBy(x => x._1).map(x => x._2)) + (userCosts, prodCosts) } private class ALSRegistrator extends KryoRegistrator { From 0455cd455b2ad31f67cd82827df3b2204fb71414 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Thu, 1 May 2014 19:19:37 -0400 Subject: [PATCH 20/23] Parens for collectAsMap. --- .../scala/org/apache/spark/mllib/recommendation/ALS.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 8a9e0df673164..61949d17a7106 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -796,8 +796,8 @@ object ALS { prodOutbound(p) += prodSendGrid.getOrElse((p, u), 0.0) } - val userCounts = userOut.mapValues(x => x.elementIds.length).collectAsMap - val prodCounts = prodOut.mapValues(x => x.elementIds.length).collectAsMap + val userCounts = userOut.mapValues(x => x.elementIds.length).collectAsMap() + val prodCounts = prodOut.mapValues(x => x.elementIds.length).collectAsMap() val userRatings = countRatings(userIn) val prodRatings = countRatings(prodIn) From 8cbb7185e4440459a725d04e4292ec1f015bfff8 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Thu, 1 May 2014 19:27:24 -0400 Subject: [PATCH 21/23] Braces get spaces. --- .../apache/spark/mllib/recommendation/ALS.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 61949d17a7106..323b3aeffea8c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -753,18 +753,18 @@ object ALS { ratingsByProductBlock, productPartitioner) def sendGrid(outLinks: RDD[(Int, OutLinkBlock)]): Map[(Int, Int), Double] = { - outLinks.map{ x => + outLinks.map { x => val grid = new mutable.HashMap[(Int, Int), Double]() val uPartition = x._1 - x._2.shouldSend.foreach{ ss => - ss.foreach{ pPartition => + x._2.shouldSend.foreach { ss => + ss.foreach { pPartition => val pair = (uPartition, pPartition) grid.put(pair, grid.getOrElse(pair, 0.0) + 1.0) } } grid - }.reduce{ (grid1, grid2) => - grid2.foreach{ x => + }.reduce { (grid1, grid2) => + grid2.foreach { x => grid1.put(x._1, grid1.getOrElse(x._1, 0.0) + x._2) } grid1 @@ -772,10 +772,10 @@ object ALS { } def countRatings(inLinks: RDD[(Int, InLinkBlock)]): Map[Int, Double] = { - inLinks.mapValues{ ilb => + inLinks.mapValues { ilb => var numRatings = 0.0 - ilb.ratingsForBlock.foreach{ ar => - ar.foreach{ p => numRatings += p._1.length } + ilb.ratingsForBlock.foreach { ar => + ar.foreach { p => numRatings += p._1.length } } numRatings }.collectAsMap().toMap From 217bd1d70a92fa8680e17b3ea4255d4cacad33ae Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Thu, 1 May 2014 19:34:34 -0400 Subject: [PATCH 22/23] Documentation and choleskies -> subproblems. --- .../scala/org/apache/spark/mllib/recommendation/ALS.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 323b3aeffea8c..f831e7392d710 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -712,9 +712,13 @@ object ALS { /** * :: DeveloperApi :: * Represents the cost attributable to a partition of a single ALS iteration. + * `inboundVectors` and `outboundVectors` are the number of feature vectors sent in and out, + * respectively, from a block in each iteration. `outerProducts` is the number of outer products + * that need to be accumulated in each iteration. `subproblems` is the number of least-squares + * subproblems solved in each iteration. */ @DeveloperApi - case class IterationCost(inboundVectors: Double, outerProducts: Double, choleskies: Double, + case class IterationCost(inboundVectors: Double, outerProducts: Double, subproblems: Double, outboundVectors: Double) /** From 9b56a8bdee87698393a16ca4e4bfdebfd315c9fd Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 1 Aug 2014 19:45:13 -0700 Subject: [PATCH 23/23] updated API and added a simple test --- .../spark/mllib/recommendation/ALS.scala | 171 +++++++----------- .../spark/mllib/recommendation/ALSSuite.scala | 26 ++- 2 files changed, 90 insertions(+), 107 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index e495e2f45102f..8ebc7e27ed4dd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -17,8 +17,8 @@ package org.apache.spark.mllib.recommendation -import scala.collection.mutable.{ArrayBuffer, BitSet} import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.math.{abs, sqrt} import scala.util.Random import scala.util.Sorting @@ -40,7 +40,8 @@ import org.apache.spark.mllib.optimization.NNLS * of the elements within this block, and the list of destination blocks that each user or * product will need to send its feature vector to. */ -private[recommendation] case class OutLinkBlock(elementIds: Array[Int], shouldSend: Array[BitSet]) +private[recommendation] +case class OutLinkBlock(elementIds: Array[Int], shouldSend: Array[mutable.BitSet]) /** @@ -383,7 +384,7 @@ class ALS private ( val userIds = ratings.map(_.user).distinct.sorted val numUsers = userIds.length val userIdToPos = userIds.zipWithIndex.toMap - val shouldSend = Array.fill(numUsers)(new BitSet(numProductBlocks)) + val shouldSend = Array.fill(numUsers)(new mutable.BitSet(numProductBlocks)) for (r <- ratings) { shouldSend(userIdToPos(r.user))(productPartitioner.getPartition(r.product)) = true } @@ -801,93 +802,94 @@ object ALS { /** * :: DeveloperApi :: - * Represents the cost attributable to a partition of a single ALS iteration. - * `inboundVectors` and `outboundVectors` are the number of feature vectors sent in and out, - * respectively, from a block in each iteration. `outerProducts` is the number of outer products - * that need to be accumulated in each iteration. `subproblems` is the number of least-squares - * subproblems solved in each iteration. + * Statistics of a block in ALS computation. + * + * @param category type of this block, "user" or "product" + * @param index index of this block + * @param count number of users or products inside this block, the same as the number of + * least-squares problems to solve on this block in each iteration + * @param numRatings total number of ratings inside this block, the same as the number of outer + * products we need to make on this block in each iteration + * @param numInLinks total number of incoming links, the same as the number of vectors to retrieve + * before each iteration + * @param numOutLinks total number of outgoing links, the same as the number of vectors to send + * for the next iteration */ @DeveloperApi - case class IterationCost(inboundVectors: Double, outerProducts: Double, subproblems: Double, - outboundVectors: Double) + case class BlockStats( + category: String, + index: Int, + count: Long, + numRatings: Long, + numInLinks: Long, + numOutLinks: Long) /** * :: DeveloperApi :: - * Given an RDD of ratings, a rank, and two partitioners, compute rough estimates of the - * computation time and communication cost of one iteration of ALS. Returns a pair of - * `Seq[IterationCost]`s. The first `Seq` represents user partitions and the second `Seq` - * represents product partitions. These `Seq`s are indexed by partition numbers, and the `i`th - * member contains details for the `i`th partition. + * Given an RDD of ratings, number of user blocks, and number of product blocks, computes the + * statistics of each block in ALS computation. This is useful for estimating cost and diagnosing + * load balance. * - * @param ratings RDD of Rating objects - * @param userPartitioner partitioner for partitioning users - * @param productPartitioner partitioner for partitioning products + * @param ratings an RDD of ratings + * @param numUserBlocks number of user blocks + * @param numProductBlocks number of product blocks + * @return statistics of user blocks and product blocks */ @DeveloperApi - def estimateCost( + def analyzeBlocks( ratings: RDD[Rating], - userPartitioner: Partitioner, - productPartitioner: Partitioner - ): (Seq[IterationCost], Seq[IterationCost]) = { - val numUserPartitions = userPartitioner.numPartitions - val numProdPartitions = productPartitioner.numPartitions + numUserBlocks: Int, + numProductBlocks: Int): Array[BlockStats] = { + + val userPartitioner = new ALSPartitioner(numUserBlocks) + val productPartitioner = new ALSPartitioner(numProductBlocks) - val ratingsByUserBlock = ratings.map{ rating => + val ratingsByUserBlock = ratings.map { rating => (userPartitioner.getPartition(rating.user), rating) } - val ratingsByProductBlock = ratings.map{ rating => + val ratingsByProductBlock = ratings.map { rating => (productPartitioner.getPartition(rating.product), Rating(rating.product, rating.user, rating.rating)) } val als = new ALS() - val (userIn, userOut) = als.makeLinkRDDs(userPartitioner.numPartitions, - ratingsByUserBlock, userPartitioner) - val (prodIn, prodOut) = als.makeLinkRDDs(productPartitioner.numPartitions, - ratingsByProductBlock, productPartitioner) + val (userIn, userOut) = + als.makeLinkRDDs(numUserBlocks, numProductBlocks, ratingsByUserBlock, userPartitioner) + val (prodIn, prodOut) = + als.makeLinkRDDs(numProductBlocks, numUserBlocks, ratingsByProductBlock, productPartitioner) - def sendGrid(outLinks: RDD[(Int, OutLinkBlock)]): Map[(Int, Int), Double] = { + def sendGrid(outLinks: RDD[(Int, OutLinkBlock)]): Map[(Int, Int), Long] = { outLinks.map { x => - val grid = new mutable.HashMap[(Int, Int), Double]() + val grid = new mutable.HashMap[(Int, Int), Long]() val uPartition = x._1 x._2.shouldSend.foreach { ss => ss.foreach { pPartition => val pair = (uPartition, pPartition) - grid.put(pair, grid.getOrElse(pair, 0.0) + 1.0) + grid.put(pair, grid.getOrElse(pair, 0L) + 1L) } } grid }.reduce { (grid1, grid2) => grid2.foreach { x => - grid1.put(x._1, grid1.getOrElse(x._1, 0.0) + x._2) + grid1.put(x._1, grid1.getOrElse(x._1, 0L) + x._2) } grid1 }.toMap } - def countRatings(inLinks: RDD[(Int, InLinkBlock)]): Map[Int, Double] = { - inLinks.mapValues { ilb => - var numRatings = 0.0 - ilb.ratingsForBlock.foreach { ar => - ar.foreach { p => numRatings += p._1.length } - } - numRatings - }.collectAsMap().toMap - } - val userSendGrid = sendGrid(userOut) val prodSendGrid = sendGrid(prodOut) - val userInbound = new Array[Double](numUserPartitions) - val prodInbound = new Array[Double](numProdPartitions) - val userOutbound = new Array[Double](numUserPartitions) - val prodOutbound = new Array[Double](numProdPartitions) + val userInbound = new Array[Long](numUserBlocks) + val prodInbound = new Array[Long](numProductBlocks) + val userOutbound = new Array[Long](numUserBlocks) + val prodOutbound = new Array[Long](numProductBlocks) - for (u <- 0 until numUserPartitions; p <- 0 until numProdPartitions) { - userOutbound(u) += userSendGrid.getOrElse((u, p), 0.0) - prodInbound(p) += userSendGrid.getOrElse((u, p), 0.0) - userInbound(u) += prodSendGrid.getOrElse((p, u), 0.0) - prodOutbound(p) += prodSendGrid.getOrElse((p, u), 0.0) + for (u <- 0 until numUserBlocks; p <- 0 until numProductBlocks) { + userOutbound(u) += userSendGrid.getOrElse((u, p), 0L) + prodInbound(p) += userSendGrid.getOrElse((u, p), 0L) + userInbound(u) += prodSendGrid.getOrElse((p, u), 0L) + prodOutbound(p) += prodSendGrid.getOrElse((p, u), 0L) } val userCounts = userOut.mapValues(x => x.elementIds.length).collectAsMap() @@ -896,58 +898,21 @@ object ALS { val userRatings = countRatings(userIn) val prodRatings = countRatings(prodIn) - val userCosts = new Array[IterationCost](numUserPartitions) - val prodCosts = new Array[IterationCost](numProdPartitions) + val userStats = Array.tabulate(numUserBlocks)( + u => BlockStats("user", u, userCounts(u), userRatings(u), userInbound(u), userOutbound(u))) + val productStatus = Array.tabulate(numProductBlocks)( + p => BlockStats("product", p, prodCounts(p), prodRatings(p), prodInbound(p), prodOutbound(p))) - for (u <- 0 until numUserPartitions) { - userCosts(u) = IterationCost(userInbound(u), userRatings(u), userCounts(u), userOutbound(u)) - } - - for (p <- 0 until numProdPartitions) { - prodCosts(p) = IterationCost(prodInbound(p), prodRatings(p), prodCounts(p), prodOutbound(p)) - } - - (userCosts, prodCosts) - } - - private class ALSRegistrator extends KryoRegistrator { - override def registerClasses(kryo: Kryo) { - kryo.register(classOf[Rating]) - } + (userStats ++ productStatus).toArray } - def main(args: Array[String]) { - if (args.length < 5 || args.length > 9) { - println("Usage: ALS " + - "[] [] [] []") - System.exit(1) - } - val (master, ratingsFile, rank, iters, outputDir) = - (args(0), args(1), args(2).toInt, args(3).toInt, args(4)) - val lambda = if (args.length >= 6) args(5).toDouble else 0.01 - val implicitPrefs = if (args.length >= 7) args(6).toBoolean else false - val alpha = if (args.length >= 8) args(7).toDouble else 1 - val blocks = if (args.length == 9) args(8).toInt else -1 - val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) - .set("spark.kryo.referenceTracking", "false") - .set("spark.kryoserializer.buffer.mb", "8") - .set("spark.locality.wait", "10000") - val sc = new SparkContext(master, "ALS", conf) - - val ratings = sc.textFile(ratingsFile).map { line => - val fields = line.split(',') - Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) - } - val model = new ALS(rank = rank, iterations = iters, lambda = lambda, - numBlocks = blocks, implicitPrefs = implicitPrefs, alpha = alpha).run(ratings) - - model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") } - .saveAsTextFile(outputDir + "/userFeatures") - model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") } - .saveAsTextFile(outputDir + "/productFeatures") - println("Final user/product features written to " + outputDir) - sc.stop() + private def countRatings(inLinks: RDD[(Int, InLinkBlock)]): Map[Int, Long] = { + inLinks.mapValues { ilb => + var numRatings = 0L + ilb.ratingsForBlock.foreach { ar => + ar.foreach { p => numRatings += p._1.length } + } + numRatings + }.collectAsMap().toMap } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 81bebec8c7a39..017c39edb185f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -22,11 +22,11 @@ import scala.math.abs import scala.util.Random import org.scalatest.FunSuite - import org.jblas.DoubleMatrix -import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.recommendation.ALS.BlockStats object ALSSuite { @@ -67,8 +67,10 @@ object ALSSuite { case true => // Generate raw values from [0,9], or if negativeWeights, from [-2,7] val raw = new DoubleMatrix(users, products, - Array.fill(users * products)((if (negativeWeights) -2 else 0) + rand.nextInt(10).toDouble): _*) - val prefs = new DoubleMatrix(users, products, raw.data.map(v => if (v > 0) 1.0 else 0.0): _*) + Array.fill(users * products)( + (if (negativeWeights) -2 else 0) + rand.nextInt(10).toDouble): _*) + val prefs = + new DoubleMatrix(users, products, raw.data.map(v => if (v > 0) 1.0 else 0.0): _*) (raw, prefs) case false => (userMatrix.mmul(productMatrix), null) } @@ -160,6 +162,22 @@ class ALSSuite extends FunSuite with LocalSparkContext { testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, -1, false) } + test("analyze one user block and one product block") { + val localRatings = Seq( + Rating(0, 100, 1.0), + Rating(0, 101, 2.0), + Rating(0, 102, 3.0), + Rating(1, 102, 4.0), + Rating(2, 103, 5.0)) + val ratings = sc.makeRDD(localRatings, 2) + val stats = ALS.analyzeBlocks(ratings, 1, 1) + assert(stats.size === 2) + assert(stats(0) === BlockStats("user", 0, 3, 5, 4, 3)) + assert(stats(1) === BlockStats("product", 0, 4, 5, 3, 4)) + } + + // TODO: add tests for analyzing multiple user/product blocks + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. *