From 286571bc64f8c7df88025d0a8dab6958cbe6ba88 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 13 Jul 2014 15:57:15 +0100 Subject: [PATCH 1/2] Use long instead of int for user/product IDs --- .../apache/spark/examples/mllib/JavaALS.java | 4 +- .../spark/examples/mllib/MovieLensALS.scala | 4 +- .../mllib/api/python/PythonMLLibAPI.scala | 12 +++--- .../spark/mllib/recommendation/ALS.scala | 40 +++++++++---------- .../MatrixFactorizationModel.scala | 8 ++-- .../mllib/recommendation/JavaALSSuite.java | 4 +- .../spark/mllib/recommendation/ALSSuite.scala | 22 +++++----- python/pyspark/mllib/_common.py | 14 +++---- 8 files changed, 56 insertions(+), 52 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java index 8d381d4e0a943..10dc3817e840e 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java @@ -42,8 +42,8 @@ static class ParseRating implements Function { @Override public Rating call(String line) { String[] tok = COMMA.split(line); - int x = Integer.parseInt(tok[0]); - int y = Integer.parseInt(tok[1]); + long x = Long.parseLong(tok[0]); + long y = Long.parseLong(tok[1]); double rating = Double.parseDouble(tok[2]); return new Rating(x, y, rating); } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index 98aaedb9d7dc9..fe5b456a97106 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -133,9 +133,9 @@ object MovieLensALS { * The semantics of 0 in this expanded world of non-positive weights * are "the same as never having interacted at all". */ - Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) + Rating(fields(0).toLong, fields(1).toLong, fields(2).toDouble - 2.5) } else { - Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) + Rating(fields(0).toLong, fields(1).toLong, fields(2).toDouble) } }.cache() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index c44173793b39a..a77db0b798b5f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -363,18 +363,18 @@ class PythonMLLibAPI extends Serializable { private def unpackRating(ratingBytes: Array[Byte]): Rating = { val bb = ByteBuffer.wrap(ratingBytes) bb.order(ByteOrder.nativeOrder()) - val user = bb.getInt() - val product = bb.getInt() + val user = bb.getLong() + val product = bb.getLong() val rating = bb.getDouble() new Rating(user, product, rating) } - /** Unpack a tuple of Ints from an array of bytes */ - private[spark] def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = { + /** Unpack a tuple of Longs from an array of bytes */ + private[spark] def unpackTuple(tupleBytes: Array[Byte]): (Long, Long) = { val bb = ByteBuffer.wrap(tupleBytes) bb.order(ByteOrder.nativeOrder()) - val v1 = bb.getInt() - val v2 = bb.getInt() + val v1 = bb.getLong() + val v2 = bb.getLong() (v1, v2) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index cc56fd6ef28d6..0c8ba47e4bf46 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, BitSet} import scala.math.{abs, sqrt} import scala.util.Random import scala.util.Sorting -import scala.util.hashing.byteswap32 +import scala.util.hashing._ import org.jblas.{DoubleMatrix, SimpleBlas, Solve} @@ -39,7 +39,7 @@ import org.apache.spark.mllib.optimization.NNLS * of the elements within this block, and the list of destination blocks that each user or * product will need to send its feature vector to. */ -private[recommendation] case class OutLinkBlock(elementIds: Array[Int], shouldSend: Array[BitSet]) +private[recommendation] case class OutLinkBlock(elementIds: Array[Long], shouldSend: Array[BitSet]) /** @@ -53,15 +53,15 @@ private[recommendation] case class OutLinkBlock(elementIds: Array[Int], shouldSe * we get product block b's message to update the corresponding users. */ private[recommendation] case class InLinkBlock( - elementIds: Array[Int], ratingsForBlock: Array[Array[(Array[Int], Array[Double])]]) + elementIds: Array[Long], ratingsForBlock: Array[Array[(Array[Int], Array[Double])]]) /** * :: Experimental :: - * A more compact class to represent a rating than Tuple3[Int, Int, Double]. + * A more compact class to represent a rating than Tuple3[Long, Long, Double]. */ @Experimental -case class Rating(user: Int, product: Int, rating: Double) +case class Rating(user: Long, product: Long, rating: Double) /** * Alternating Least Squares matrix factorization. @@ -195,12 +195,12 @@ class ALS private ( val sc = ratings.context val numUserBlocks = if (this.numUserBlocks == -1) { - math.max(sc.defaultParallelism, ratings.partitions.size / 2) + math.max(sc.defaultParallelism, ratings.partitions.length / 2) } else { this.numUserBlocks } val numProductBlocks = if (this.numProductBlocks == -1) { - math.max(sc.defaultParallelism, ratings.partitions.size / 2) + math.max(sc.defaultParallelism, ratings.partitions.length / 2) } else { this.numProductBlocks } @@ -246,7 +246,7 @@ class ALS private ( if (implicitPrefs) { for (iter <- 1 to iterations) { // perform ALS update - logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) + logInfo(s"Re-computing I given U (Iteration $iter/$iterations)") // Persist users because it will be called twice. users.setName(s"users-$iter").persist() val YtY = Some(sc.broadcast(computeYtY(users))) @@ -254,7 +254,7 @@ class ALS private ( products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks, userPartitioner, rank, lambda, alpha, YtY) previousProducts.unpersist() - logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) + logInfo(s"Re-computing U given I (Iteration $iter/$iterations)") products.setName(s"products-$iter").persist() val XtX = Some(sc.broadcast(computeYtY(products))) val previousUsers = users @@ -265,11 +265,11 @@ class ALS private ( } else { for (iter <- 1 to iterations) { // perform ALS update - logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) + logInfo(s"Re-computing I given U (Iteration $iter/$iterations)") products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks, userPartitioner, rank, lambda, alpha, YtY = None) products.setName(s"products-$iter") - logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) + logInfo(s"Re-computing U given I (Iteration $iter/$iterations)") users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks, productPartitioner, rank, lambda, alpha, YtY = None) users.setName(s"users-$iter") @@ -304,7 +304,7 @@ class ALS private ( /** * Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors - * for each user (or product), in a distributed fashion. + * for each user (or product) block, in a distributed fashion. * * @param factors the (block-distributed) user or product factor vectors * @return YtY - whose value is only used in the implicit preference model @@ -361,7 +361,7 @@ class ALS private ( */ private def unblockFactors( blockedFactors: RDD[(Int, Array[Array[Double]])], - outLinks: RDD[(Int, OutLinkBlock)]): RDD[(Int, Array[Double])] = { + outLinks: RDD[(Int, OutLinkBlock)]): RDD[(Long, Array[Double])] = { blockedFactors.join(outLinks).flatMap { case (b, (factors, outLinkBlock)) => for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) } @@ -401,11 +401,7 @@ class ALS private ( // Create an array of (product, Seq(Rating)) ratings val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray // Sort them by product ID - val ordering = new Ordering[(Int, ArrayBuffer[Rating])] { - def compare(a: (Int, ArrayBuffer[Rating]), b: (Int, ArrayBuffer[Rating])): Int = - a._1 - b._1 - } - Sorting.quickSort(groupedRatings)(ordering) + Sorting.quickSort(groupedRatings)(Ordering.by(_._1)) // Translate the user IDs to indices based on userIdToPos ratingsForBlock(productBlock) = groupedRatings.map { case (p, rs) => (rs.view.map(r => userIdToPos(r.user)).toArray, rs.view.map(_.rating).toArray) @@ -608,8 +604,12 @@ class ALS private ( * Partitioner for ALS. */ private[recommendation] class ALSPartitioner(override val numPartitions: Int) extends Partitioner { - override def getPartition(key: Any): Int = { - Utils.nonNegativeMod(byteswap32(key.asInstanceOf[Int]), numPartitions) + override def getPartition(key: Any): Int = key match { + case i:Int => Utils.nonNegativeMod(byteswap32(i), numPartitions) + case l:Long => { + val hashLong = byteswap64(l) + Utils.nonNegativeMod(hashLong.toInt ^ (hashLong >> 32).toInt, numPartitions) + } } override def equals(obj: Any): Boolean = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 899286d235a9d..64e167b095932 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -36,10 +36,10 @@ import org.apache.spark.mllib.api.python.PythonMLLibAPI */ class MatrixFactorizationModel private[mllib] ( val rank: Int, - val userFeatures: RDD[(Int, Array[Double])], - val productFeatures: RDD[(Int, Array[Double])]) extends Serializable { + val userFeatures: RDD[(Long, Array[Double])], + val productFeatures: RDD[(Long, Array[Double])]) extends Serializable { /** Predict the rating of one user for one product. */ - def predict(user: Int, product: Int): Double = { + def predict(user: Long, product: Long): Double = { val userVector = new DoubleMatrix(userFeatures.lookup(user).head) val productVector = new DoubleMatrix(productFeatures.lookup(product).head) userVector.dot(productVector) @@ -53,7 +53,7 @@ class MatrixFactorizationModel private[mllib] ( * @param usersProducts RDD of (user, product) pairs. * @return RDD of Ratings. */ - def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = { + def predict(usersProducts: RDD[(Long, Long)]): RDD[Rating] = { val users = userFeatures.join(usersProducts).map{ case (user, (uFeatures, product)) => (product, (user, uFeatures)) } diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index bf2365f82044c..e8b09043da004 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -50,7 +50,7 @@ static void validatePrediction(MatrixFactorizationModel model, int users, int pr List> userFeatures = model.userFeatures().toJavaRDD().collect(); for (int i = 0; i < features; ++i) { for (scala.Tuple2 userFeature : userFeatures) { - predictedU.put((Integer)userFeature._1(), i, userFeature._2()[i]); + predictedU.put(((Number) userFeature._1()).intValue(), i, userFeature._2()[i]); } } DoubleMatrix predictedP = new DoubleMatrix(products, features); @@ -59,7 +59,7 @@ static void validatePrediction(MatrixFactorizationModel model, int users, int pr model.productFeatures().toJavaRDD().collect(); for (int i = 0; i < features; ++i) { for (scala.Tuple2 productFeature : productFeatures) { - predictedP.put((Integer)productFeature._1(), i, productFeature._2()[i]); + predictedP.put(((Number) productFeature._1()).intValue(), i, productFeature._2()[i]); } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 81bebec8c7a39..8ebb8274ee195 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -145,13 +145,13 @@ class ALSSuite extends FunSuite with LocalSparkContext { val correct = data._2 val model = ALS.train(ratings, 5, 15) - val pairs = Array.tabulate(50, 50)((u, p) => (u - 25, p - 25)).flatten + val pairs = Array.tabulate(50, 50)((u, p) => (u - 25L, p - 25L)).flatten val ans = model.predict(sc.parallelize(pairs)).collect() ans.foreach { r => val u = r.user + 25 val p = r.product + 25 val v = r.rating - val error = v - correct.get(u, p) + val error = v - correct.get(u.toInt, p.toInt) assert(math.abs(error) < 0.4) } } @@ -170,7 +170,7 @@ class ALSSuite extends FunSuite with LocalSparkContext { * @param samplingRate what fraction of the user-product pairs are known * @param matchThreshold max difference allowed to consider a predicted rating correct * @param implicitPrefs flag to test implicit feedback - * @param bulkPredict flag to test bulk prediciton + * @param bulkPredict flag to test bulk prediction * @param negativeWeights whether the generated data can contain negative values * @param numUserBlocks number of user blocks to partition users into * @param numProductBlocks number of product blocks to partition products into @@ -206,20 +206,21 @@ class ALSSuite extends FunSuite with LocalSparkContext { val predictedU = new DoubleMatrix(users, features) for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) { - predictedU.put(u, i, vec(i)) + predictedU.put(u.toInt, i, vec(i)) } val predictedP = new DoubleMatrix(products, features) for ((p, vec) <- model.productFeatures.collect(); i <- 0 until features) { - predictedP.put(p, i, vec(i)) + predictedP.put(p.toInt, i, vec(i)) } val predictedRatings = bulkPredict match { case false => predictedU.mmul(predictedP.transpose) case true => val allRatings = new DoubleMatrix(users, products) - val usersProducts = for (u <- 0 until users; p <- 0 until products) yield (u, p) + val usersProducts = + for (u <- 0 until users; p <- 0 until products) yield (u.toLong, p.toLong) val userProductsRDD = sc.parallelize(usersProducts) model.predict(userProductsRDD).collect().foreach { elem => - allRatings.put(elem.user, elem.product, elem.rating) + allRatings.put(elem.user.toInt, elem.product.toInt, elem.rating) } allRatings } @@ -248,8 +249,11 @@ class ALSSuite extends FunSuite with LocalSparkContext { } val rmse = math.sqrt(sqErr / denom) if (rmse > matchThreshold) { - fail("Model failed to predict RMSE: %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format( - rmse, truePrefs, predictedRatings, predictedU, predictedP)) + fail(s"""Model failed to predict RMSE: $rmse\n + |corr: $truePrefs\n + |pred: $predictedRatings\n + |U: $predictedU\n + |P: $predictedP""".stripMargin) } } } diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index e609b60a0f968..a57e17d6455b8 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -374,10 +374,10 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights): # Functions for serializing ALS Rating objects and tuples def _serialize_rating(r): - ba = bytearray(16) - intpart = ndarray(shape=[2], buffer=ba, dtype=int32) - doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=8) - intpart[0], intpart[1], doublepart[0] = r + ba = bytearray(24) + longpart = ndarray(shape=[2], buffer=ba, dtype=int64) + doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=16) + longpart[0], longpart[1], doublepart[0] = r return ba @@ -399,9 +399,9 @@ def load_stream(self, stream): def _serialize_tuple(t): - ba = bytearray(8) - intpart = ndarray(shape=[2], buffer=ba, dtype=int32) - intpart[0], intpart[1] = t + ba = bytearray(16) + longpart = ndarray(shape=[2], buffer=ba, dtype=int64) + longpart[0], longpart[1] = t return ba From 7754ecea0b0ce818e9257b34bb0d9e888be8d986 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 15 Jul 2014 23:02:20 +0100 Subject: [PATCH 2/2] Re-add existing MatrixFactorizationModel predict() methods. predict(RDD[(Long,Long)]) becomes predictRDD because it can't overload predict(RDD[(Int,Int)]) --- .../spark/examples/mllib/MovieLensALS.scala | 2 +- .../MatrixFactorizationModel.scala | 23 +++++++++++++++++-- .../spark/mllib/recommendation/ALSSuite.scala | 4 ++-- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index fe5b456a97106..01708bf531b87 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -187,7 +187,7 @@ object MovieLensALS { def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r - val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) + val predictions: RDD[Rating] = model.predictRDD(data.map(x => (x.user, x.product))) val predictionsAndRatings = predictions.map{ x => ((x.user, x.product), mapPredictedRating(x.rating)) }.join(data.map(x => ((x.user, x.product), x.rating))).values diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 64e167b095932..8b48c592029ea 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -39,6 +39,10 @@ class MatrixFactorizationModel private[mllib] ( val userFeatures: RDD[(Long, Array[Double])], val productFeatures: RDD[(Long, Array[Double])]) extends Serializable { /** Predict the rating of one user for one product. */ + def predict(user: Int, product: Int): Double = + predict(user.toLong, product.toLong) + + /** Predict the rating of one user for one product. */ def predict(user: Long, product: Long): Double = { val userVector = new DoubleMatrix(userFeatures.lookup(user).head) val productVector = new DoubleMatrix(productFeatures.lookup(product).head) @@ -53,7 +57,22 @@ class MatrixFactorizationModel private[mllib] ( * @param usersProducts RDD of (user, product) pairs. * @return RDD of Ratings. */ - def predict(usersProducts: RDD[(Long, Long)]): RDD[Rating] = { + def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = + predictRDD(usersProducts.map(t => (t._1.toLong, t._2.toLong))) + + // The following must be named differently since it can't overload the previous method; + // both have the same arguments after erasure. + // Might be useful to name the RDD method differently anyway. + + /** + * Predict the rating of many users for many products. + * The output RDD has an element per each element in the input RDD (including all duplicates) + * unless a user or product is missing in the training set. + * + * @param usersProducts RDD of (user, product) pairs. + * @return RDD of Ratings. + */ + def predictRDD(usersProducts: RDD[(Long, Long)]): RDD[Rating] = { val users = userFeatures.join(usersProducts).map{ case (user, (uFeatures, product)) => (product, (user, uFeatures)) } @@ -77,7 +96,7 @@ class MatrixFactorizationModel private[mllib] ( def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { val pythonAPI = new PythonMLLibAPI() val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes)) - predict(usersProducts).map(rate => pythonAPI.serializeRating(rate)) + predictRDD(usersProducts).map(rate => pythonAPI.serializeRating(rate)) } // TODO: Figure out what other good bulk prediction methods would look like. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 8ebb8274ee195..d1eec27391b4b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -146,7 +146,7 @@ class ALSSuite extends FunSuite with LocalSparkContext { val model = ALS.train(ratings, 5, 15) val pairs = Array.tabulate(50, 50)((u, p) => (u - 25L, p - 25L)).flatten - val ans = model.predict(sc.parallelize(pairs)).collect() + val ans = model.predictRDD(sc.parallelize(pairs)).collect() ans.foreach { r => val u = r.user + 25 val p = r.product + 25 @@ -219,7 +219,7 @@ class ALSSuite extends FunSuite with LocalSparkContext { val usersProducts = for (u <- 0 until users; p <- 0 until products) yield (u.toLong, p.toLong) val userProductsRDD = sc.parallelize(usersProducts) - model.predict(userProductsRDD).collect().foreach { elem => + model.predictRDD(userProductsRDD).collect().foreach { elem => allRatings.put(elem.user.toInt, elem.product.toInt, elem.rating) } allRatings