Skip to content

Commit

Permalink
Support custom partitioners. Currently we use the same partitioner fo…
Browse files Browse the repository at this point in the history
…r users and products.
  • Loading branch information
tmyklebu committed Apr 15, 2014
1 parent c90b6d8 commit df27697
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 23 deletions.
104 changes: 85 additions & 19 deletions mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, BitSet}
import scala.math.{abs, sqrt}
import scala.util.Random
import scala.util.Sorting
import scala.util.hashing.byteswap32

import com.esotericsoftware.kryo.Kryo
import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
Expand Down Expand Up @@ -97,15 +98,14 @@ class ALS private (
private var lambda: Double,
private var implicitPrefs: Boolean,
private var alpha: Double,
private var partitioner: Partitioner = null,
private var seed: Long = System.nanoTime()
) extends Serializable with Logging {

/**
* Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10,
* lambda: 0.01, implicitPrefs: false, alpha: 1.0}.
*/
def this() = this(-1, 10, 10, 0.01, false, 1.0, null)
def this() = this(-1, 10, 10, 0.01, false, 1.0)

/**
* Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured
Expand All @@ -116,6 +116,14 @@ class ALS private (
this
}

var partitioner: Partitioner = null

/** Sets the Partitioner that partitions users. */
def setPartitioner(p: Partitioner): ALS = {
this.partitioner = p
this
}

/** Set the rank of the feature matrices computed (number of features). Default: 10. */
def setRank(rank: Int): ALS = {
this.rank = rank
Expand Down Expand Up @@ -169,24 +177,23 @@ class ALS private (
this.numBlocks
}

// Hash an integer to propagate random bits at all positions, similar to java.util.HashTable
def hash(x: Int): Int = {
val r = x ^ (x >>> 20) ^ (x >>> 12)
r ^ (r >>> 7) ^ (r >>> 4)
}

this.partitioner = new Partitioner {
def numPartitions = numBlocks
val defaultPartitioner = new Partitioner {
val numPartitions = numBlocks

def getPartition(x: Any): Int = x match {
case null => 0
case _ => Utils.nonNegativeMod(hash(x.hashCode), numPartitions)
case _ => Utils.nonNegativeMod(byteswap32(x.hashCode), numPartitions)
}
}

val ratingsByUserBlock = ratings.map{ rating => (partitioner.getPartition(rating.user), rating) }
if (partitioner == null) partitioner = defaultPartitioner

val ratingsByUserBlock = ratings.map{ rating =>
(partitioner.getPartition(rating.user), rating)
}
val ratingsByProductBlock = ratings.map{ rating =>
(partitioner.getPartition(rating.product), Rating(rating.product, rating.user, rating.rating))
(partitioner.getPartition(rating.product),
Rating(rating.product, rating.user, rating.rating))
}

val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
Expand All @@ -198,13 +205,13 @@ class ALS private (
val seed1 = seedGen.nextInt()
val seed2 = seedGen.nextInt()
var users = userOutLinks.mapPartitionsWithIndex { (index, itr) =>
val rand = new Random(hash(seed1 ^ index))
val rand = new Random(byteswap32(seed1 ^ index))
itr.map { case (x, y) =>
(x, y.elementIds.map(_ => randomFactor(rank, rand)))
}
}
var products = productOutLinks.mapPartitionsWithIndex { (index, itr) =>
val rand = new Random(hash(seed2 ^ index))
val rand = new Random(byteswap32(seed2 ^ index))
itr.map { case (x, y) =>
(x, y.elementIds.map(_ => randomFactor(rank, rand)))
}
Expand Down Expand Up @@ -531,6 +538,34 @@ class ALS private (
*/
object ALS {

/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. This is done using a level of
* parallelism given by `blocks`, partitioning the data using the Partitioner `partitioner`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param seed random seed
* @param partitioner Partitioner mapping users and products to partitions
*/
def train(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
seed: Long,
partitioner: Partitioner) = {
val als = new ALS(blocks, rank, iterations, lambda, false, 1.0, seed)
als.setPartitioner(partitioner)
als.run(ratings)
}

/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
Expand All @@ -553,7 +588,7 @@ object ALS {
blocks: Int,
seed: Long
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, false, 1.0, null, seed).run(ratings)
new ALS(blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
}

/**
Expand All @@ -576,7 +611,7 @@ object ALS {
lambda: Double,
blocks: Int
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, false, 1.0, null).run(ratings)
new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings)
}

/**
Expand Down Expand Up @@ -612,6 +647,37 @@ object ALS {
train(ratings, rank, iterations, 0.01, -1)
}

/**
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
* To solve for these features, we run a given number of iterations of ALS. This is done using
* a level of parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param alpha confidence parameter (only applies when immplicitPrefs = true)
* @param seed random seed
* @param partitioner Partitioner for partitioning users and products
*/
def trainImplicit(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double,
seed: Long,
partitioner: Partitioner
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, true, alpha, seed)
.setPartitioner(partitioner)
.run(ratings)
}

/**
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
Expand All @@ -636,7 +702,7 @@ object ALS {
alpha: Double,
seed: Long
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, true, alpha, null, seed).run(ratings)
new ALS(blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
}

/**
Expand All @@ -661,7 +727,7 @@ object ALS {
blocks: Int,
alpha: Double
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, true, alpha, null).run(ratings)
new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.jblas.DoubleMatrix

import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.Partitioner

object ALSSuite {

Expand Down Expand Up @@ -74,7 +75,6 @@ object ALSSuite {

(sampledRatings, trueRatings, truePrefs)
}

}


Expand Down Expand Up @@ -128,6 +128,17 @@ class ALSSuite extends FunSuite with LocalSparkContext {
assert(u11 != u2)
}

test("custom partitioner") {
testALS(50, 50, 2, 15, 0.7, 0.3, false, false, false, 3, null)
testALS(50, 50, 2, 15, 0.7, 0.3, false, false, false, 3, new Partitioner {
def numPartitions(): Int = 3
def getPartition(x: Any): Int = x match {
case null => 0
case _ => x.hashCode % 2
}
})
}

/**
* Test if we can correctly factorize R = U * P where U and P are of known rank.
*
Expand All @@ -140,16 +151,21 @@ class ALSSuite extends FunSuite with LocalSparkContext {
* @param implicitPrefs flag to test implicit feedback
* @param bulkPredict flag to test bulk prediciton
* @param negativeWeights whether the generated data can contain negative values
* @param numBlocks number of blocks to partition users and products into
* @param partitioner partitioner
*/
def testALS(users: Int, products: Int, features: Int, iterations: Int,
samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false,
bulkPredict: Boolean = false, negativeWeights: Boolean = false)
bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1,
partitioner: Partitioner = null)
{
val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products,
features, samplingRate, implicitPrefs, negativeWeights)
val model = implicitPrefs match {
case false => ALS.train(sc.parallelize(sampledRatings), features, iterations)
case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations)
case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01,
numBlocks, 0L, partitioner)
case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01,
numBlocks, 1.0, 0L, partitioner)
}

val predictedU = new DoubleMatrix(users, features)
Expand Down

0 comments on commit df27697

Please sign in to comment.