Skip to content

Commit

Permalink
Stop making the partitioner configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmyklebu committed Apr 21, 2014
1 parent 495784f commit 23d6f91
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@ class ALS private (

private var partitioner: Partitioner = null

/** Sets the Partitioner that partitions users and products. */
def setPartitioner(p: Partitioner): ALS = {
this.partitioner = p
this
}

/** Set the rank of the feature matrices computed (number of features). Default: 10. */
def setRank(rank: Int): ALS = {
this.rank = rank
Expand Down Expand Up @@ -179,16 +173,14 @@ class ALS private (
this.numBlocks
}

val defaultPartitioner = new Partitioner {
partitioner = new Partitioner {
val numPartitions = numBlocks

def getPartition(x: Any): Int = {
Utils.nonNegativeMod(byteswap32(x.asInstanceOf[Int]), numPartitions)
}
}

if (partitioner == null) partitioner = defaultPartitioner

val ratingsByUserBlock = ratings.map{ rating =>
(partitioner.getPartition(rating.user), rating)
}
Expand Down Expand Up @@ -548,34 +540,6 @@ class ALS private (
*/
object ALS {

/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. This is done using a level of
* parallelism given by `blocks`, partitioning the data using the Partitioner `partitioner`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param seed random seed
* @param partitioner Partitioner mapping users and products to partitions
*/
def train(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
seed: Long,
partitioner: Partitioner) = {
val als = new ALS(blocks, rank, iterations, lambda, false, 1.0, seed)
als.setPartitioner(partitioner)
als.run(ratings)
}

/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
Expand Down Expand Up @@ -657,37 +621,6 @@ object ALS {
train(ratings, rank, iterations, 0.01, -1)
}

/**
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
* To solve for these features, we run a given number of iterations of ALS. This is done using
* a level of parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param alpha confidence parameter (only applies when immplicitPrefs = true)
* @param seed random seed
* @param partitioner Partitioner for partitioning users and products
*/
def trainImplicit(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double,
seed: Long,
partitioner: Partitioner
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, true, alpha, seed)
.setPartitioner(partitioner)
.run(ratings)
}

/**
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,6 @@ class ALSSuite extends FunSuite with LocalSparkContext {
assert(u11 != u2)
}

test("custom partitioner") {
testALS(50, 50, 2, 15, 0.7, 0.3, false, false, false, 3, null)
testALS(50, 50, 2, 15, 0.7, 0.3, false, false, false, 3, new Partitioner {
def numPartitions(): Int = 3
def getPartition(x: Any): Int = x.asInstanceOf[Int] % 2
})
}

test("negative ids") {
val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false)
val ratings = sc.parallelize(data._1.map { case Rating(u, p, r) =>
Expand Down Expand Up @@ -168,20 +160,18 @@ class ALSSuite extends FunSuite with LocalSparkContext {
* @param bulkPredict flag to test bulk prediciton
* @param negativeWeights whether the generated data can contain negative values
* @param numBlocks number of blocks to partition users and products into
* @param partitioner partitioner
*/
def testALS(users: Int, products: Int, features: Int, iterations: Int,
samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false,
bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1,
partitioner: Partitioner = null)
bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1)
{
val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products,
features, samplingRate, implicitPrefs, negativeWeights)
val model = implicitPrefs match {
case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01,
numBlocks, 0L, partitioner)
numBlocks, 0L)
case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01,
numBlocks, 1.0, 0L, partitioner)
numBlocks, 1.0, 0L)
}

val predictedU = new DoubleMatrix(users, features)
Expand Down

0 comments on commit 23d6f91

Please sign in to comment.