From 70654fa58dd4b801c551429945fa2f1377a60b2e Mon Sep 17 00:00:00 2001 From: pferrel Date: Mon, 2 Jun 2014 14:11:55 -0700 Subject: [PATCH 1/3] starting to merge the cooccurrence stuff, import errors --- .../org/apache/mahout/math/MurmurHash.java | 13 +- .../mahout/cf/CooccurrenceAnalysis.scala | 211 ++++++++++++++++++ .../mahout/cf/examples/Recommendations.scala | 159 +++++++++++++ 3 files changed, 382 insertions(+), 1 deletion(-) create mode 100644 spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala create mode 100644 spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala diff --git a/math/src/main/java/org/apache/mahout/math/MurmurHash.java b/math/src/main/java/org/apache/mahout/math/MurmurHash.java index 0b3fab0675..32dfdd614e 100644 --- a/math/src/main/java/org/apache/mahout/math/MurmurHash.java +++ b/math/src/main/java/org/apache/mahout/math/MurmurHash.java @@ -17,6 +17,8 @@ package org.apache.mahout.math; +import com.google.common.primitives.Ints; + import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -29,7 +31,16 @@ */ public final class MurmurHash { - private MurmurHash() { + private MurmurHash() {} + + /** + * Hashes an int. + * @param data The int to hash. + * @param seed The seed for the hash. + * @return The 32 bit hash of the bytes in question. + */ + public static int hash(int data, int seed) { + return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed); } /** diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala new file mode 100644 index 0000000000..6c7b99a1e7 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.sparkbindings.drm.RLikeDrmOps._ +import org.apache.mahout.sparkbindings.drm._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.{MurmurHash, Vector} +import org.apache.mahout.math.stats.LogLikelihood +import org.apache.mahout.math.scalabindings._ +import RLikeOps._ +import scala.collection.mutable +import org.apache.spark.broadcast.Broadcast +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.drm.DrmLike + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2 } + + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { + + //TODO any chance to get rid of the spark-specific code here? + implicit val sc = drmARaw.rdd.sparkContext + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions).checkpoint() + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.colSums) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + var indicatorMatrices = List(drmIndicatorsAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.colSums) + + // Compute cross-co-occurrence matrix B'A + val drmBtA = drmB.t %*% drmA + + val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerThingB, bcastInteractionsPerItemA) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA + + drmB.uncache() + } + + // Unpin downsampled interaction matrix + drmA.uncache() + + // Return list of indicator matrices + indicatorMatrices + } + + /** + * Compute loglikelihood ratio + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details + **/ + def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, + numInteractionsWithAandB: Long, numInteractions: Long) = { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + } + + def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, + bcastNumInteractionsB: Broadcast[Vector], bcastNumInteractionsA: Broadcast[Vector], + crossCooccurrence: Boolean = true) = { + drmBtA.mapBlock() { + case (keys, block) => + + val llrBlock = block.like() + val numInteractionsB: Vector = bcastNumInteractionsB + val numInteractionsA: Vector = bcastNumInteractionsA + + for (index <- 0 until keys.size) { + + val thingB = keys(index) + + // PriorityQueue to select the top-k items + val topItemsPerThing = new mutable.PriorityQueue[(Int,Double)]()(orderByScore) + + block(index, ::).nonZeroes().foreach { elem => + val thingA = elem.index + val cooccurrences = elem.get + + // exclude co-occurrences of the item with itself + if (crossCooccurrence || thingB != thingA) { + // Compute loglikelihood ratio + val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, + cooccurrences.toLong, numUsers) + val candidate = thingA -> llrRatio + + // Enqueue item with score, if belonging to the top-k + if (topItemsPerThing.size < maxInterestingItemsPerThing) { + topItemsPerThing.enqueue(candidate) + } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + } + } + } + + // Add top-k interesting items to the output matrix + topItemsPerThing.dequeueAll.foreach { case (otherThing, llrScore) => llrBlock(index, otherThing) = llrScore } + } + + keys -> llrBlock + } + } + + /** + * Selectively downsample users and things with an anormalous amount of interactions, inspired by + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java + * + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not + */ + def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { + + //TODO any chance to get rid of the spark specific code here? + implicit val sc = drmM.rdd.sparkContext + + // Pin raw interaction matrix + val drmI = drmM.checkpoint() + + // Broadcast vector containing the number of interactions with each thing + val bcastNumInteractions = drmBroadcast(drmI.colSums) + + val downSampledDrmI = drmI.mapBlock() { + case (keys, block) => + val numInteractions: Vector = bcastNumInteractions + + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) + + val downsampledBlock = block.like() + + // Downsample the interaction vector of each user + for (userIndex <- 0 until keys.size) { + + val interactionsOfUser = block(userIndex, ::) + val numInteractionsOfUser = interactionsOfUser.sum + + val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser + + interactionsOfUser.nonZeroes().foreach { elem => + val numInteractionsWithThing = numInteractions(elem.index) + val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing + + if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) { + // We ignore the original interaction value and create a binary 0-1 matrix + // as we only consider whether interactions happened or did not happen + downsampledBlock(userIndex, elem.index) = 1 + } + } + } + + keys -> downsampledBlock + } + + // Unpin raw interaction matrix + drmI.uncache() + + downSampledDrmI + } +} diff --git a/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala new file mode 100644 index 0000000000..07d065911e --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.examples + +import org.apache.mahout.sparkbindings.drm._ +import org.apache.mahout.sparkbindings._ +import scala.io.Source +import org.apache.mahout.math.SparseMatrix +import org.apache.mahout.math.scalabindings._ +import RLikeOps._ +import org.apache.mahout.cf.CooccurrenceAnalysis._ +import scala.collection.JavaConversions._ + +/** + * The Epinions dataset contains ratings from users to items and a trust-network between the users. + * We use co-occurrence analysis to compute "users who like these items, also like that items" and + * "users who trust these users, like that items" + * + * Download and unpack the dataset files from: + * + * http://www.trustlet.org/datasets/downloaded_epinions/ratings_data.txt.bz2 + * http://www.trustlet.org/datasets/downloaded_epinions/trust_data.txt.bz2 + **/ +object RunCrossCooccurrenceAnalysisOnEpinions { + + def main(args: Array[String]): Unit = { + + if (args.length == 0) { + println("Usage: RunCooccurrenceAnalysisOnMovielens1M ") + println("Download the dataset from http://www.trustlet.org/datasets/downloaded_epinions/ratings_data.txt.bz2 and") + println("http://www.trustlet.org/datasets/downloaded_epinions/trust_data.txt.bz2") + sys.exit(-1) + } + + val datasetDir = args(0) + + val epinionsRatings = new SparseMatrix(49290, 139738) + + var firstLineSkipped = false + for (line <- Source.fromFile(datasetDir + "/ratings_data.txt").getLines()) { + if (line.contains(' ') && firstLineSkipped) { + val tokens = line.split(' ') + val userID = tokens(0).toInt - 1 + val itemID = tokens(1).toInt - 1 + val rating = tokens(2).toDouble + epinionsRatings(userID, itemID) = rating + } + firstLineSkipped = true + } + + val epinionsTrustNetwork = new SparseMatrix(49290, 49290) + firstLineSkipped = false + for (line <- Source.fromFile(datasetDir + "/trust_data.txt").getLines()) { + if (line.contains(' ') && firstLineSkipped) { + val tokens = line.trim.split(' ') + val userID = tokens(0).toInt - 1 + val trustedUserId = tokens(1).toInt - 1 + epinionsTrustNetwork(userID, trustedUserId) = 1 + } + firstLineSkipped = true + } + + System.setProperty("spark.kryo.referenceTracking", "false") + System.setProperty("spark.kryoserializer.buffer.mb", "100") + + implicit val sc = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", + customJars = Traversable.empty[String]) + + val drmEpinionsRatings = drmParallelize(epinionsRatings, numPartitions = 2) + val drmEpinionsTrustNetwork = drmParallelize(epinionsTrustNetwork, numPartitions = 2) + + val indicatorMatrices = cooccurrences(drmEpinionsRatings, randomSeed = 0xdeadbeef, + maxInterestingItemsPerThing = 100, maxNumInteractions = 500, Array(drmEpinionsTrustNetwork)) + + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0), + "/tmp/co-occurrence-on-epinions/indicators-item-item/") + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1), + "/tmp/co-occurrence-on-epinions/indicators-trust-item/") + + sc.stop() + + println("Saved indicators to /tmp/co-occurrence-on-epinions/") + } +} + +/** + * The movielens1M dataset contains movie ratings, we use co-occurrence analysis to compute + * "users who like these movies, also like that movies" + * + * Download and unpack the dataset files from: + * http://files.grouplens.org/datasets/movielens/ml-1m.zip + */ +object RunCooccurrenceAnalysisOnMovielens1M { + + def main(args: Array[String]): Unit = { + + if (args.length == 0) { + println("Usage: RunCooccurrenceAnalysisOnMovielens1M ") + println("Download the dataset from http://files.grouplens.org/datasets/movielens/ml-1m.zip") + sys.exit(-1) + } + + val datasetDir = args(0) + + System.setProperty("spark.kryo.referenceTracking", "false") + System.setProperty("spark.kryoserializer.buffer.mb", "100") + + implicit val sc = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", + customJars = Traversable.empty[String]) + + System.setProperty("mahout.math.AtA.maxInMemNCol", 4000.toString) + + val movielens = new SparseMatrix(6040, 3952) + + for (line <- Source.fromFile(datasetDir + "/ratings.dat").getLines()) { + val tokens = line.split("::") + val userID = tokens(0).toInt - 1 + val itemID = tokens(1).toInt - 1 + val rating = tokens(2).toDouble + movielens(userID, itemID) = rating + } + + val drmMovielens = drmParallelize(movielens, numPartitions = 2) + + val indicatorMatrix = cooccurrences(drmMovielens).head + + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrix, + "/tmp/co-occurrence-on-movielens/indicators-item-item/") + + sc.stop() + + println("Saved indicators to /tmp/co-occurrence-on-movielens/") + } +} + +object RecommendationExamplesHelper { + + def saveIndicatorMatrix(indicatorMatrix: DrmLike[Int], path: String) = { + indicatorMatrix.rdd.flatMap({ case (thingID, itemVector) => + for (elem <- itemVector.nonZeroes()) yield { thingID + '\t' + elem.index } + }) + .saveAsTextFile(path) + } +} From fc5fb6ac37e4c12d25c35ddb7912a32aac06e449 Mon Sep 17 00:00:00 2001 From: pferrel Date: Mon, 2 Jun 2014 14:33:45 -0700 Subject: [PATCH 2/3] tried changing the imports in CooccurrenceAnalysis.scala to no avail --- .../org/apache/mahout/cf/CooccurrenceAnalysis.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala index 6c7b99a1e7..aba3717096 100644 --- a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala +++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala @@ -17,17 +17,13 @@ package org.apache.mahout.cf -import org.apache.mahout.sparkbindings.drm.RLikeDrmOps._ -import org.apache.mahout.sparkbindings.drm._ import scala.collection.JavaConversions._ -import org.apache.mahout.math.{MurmurHash, Vector} +import org.apache.mahout.math.drm.DrmLike import org.apache.mahout.math.stats.LogLikelihood -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import scala.collection.mutable import org.apache.spark.broadcast.Broadcast +import scala.collection.parallel.mutable import org.apache.mahout.common.RandomUtils -import org.apache.mahout.math.drm.DrmLike +import org.apache.mahout.math.{MurmurHash, Vector} /** From 242aed0e0921afe9a87ee8973ba8077cbe65fffa Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Mon, 2 Jun 2014 15:42:57 -0700 Subject: [PATCH 3/3] Compilation fixes, updates for MAHOUT-1529 changes --- .../mahout/cf/CooccurrenceAnalysis.scala | 23 +++++++++++-------- .../mahout/cf/examples/Recommendations.scala | 14 ++++++----- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala index aba3717096..5df329b3be 100644 --- a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala +++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala @@ -17,13 +17,18 @@ package org.apache.mahout.cf +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ + import scala.collection.JavaConversions._ -import org.apache.mahout.math.drm.DrmLike import org.apache.mahout.math.stats.LogLikelihood -import org.apache.spark.broadcast.Broadcast -import scala.collection.parallel.mutable +import collection._ +// import scala.collection.parallel.mutable import org.apache.mahout.common.RandomUtils -import org.apache.mahout.math.{MurmurHash, Vector} /** @@ -42,8 +47,7 @@ object CooccurrenceAnalysis extends Serializable { def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { - //TODO any chance to get rid of the spark-specific code here? - implicit val sc = drmARaw.rdd.sparkContext + implicit val disributedContext = drmARaw.context // Apply selective downsampling, pin resulting matrix val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions).checkpoint() @@ -105,8 +109,8 @@ object CooccurrenceAnalysis extends Serializable { } def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, - bcastNumInteractionsB: Broadcast[Vector], bcastNumInteractionsA: Broadcast[Vector], - crossCooccurrence: Boolean = true) = { + bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], + crossCooccurrence: Boolean = true) = { drmBtA.mapBlock() { case (keys, block) => @@ -158,8 +162,7 @@ object CooccurrenceAnalysis extends Serializable { */ def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { - //TODO any chance to get rid of the spark specific code here? - implicit val sc = drmM.rdd.sparkContext + implicit val distributedContext = drmM.context // Pin raw interaction matrix val drmI = drmM.checkpoint() diff --git a/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala index 07d065911e..743a661024 100644 --- a/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala +++ b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala @@ -17,12 +17,14 @@ package org.apache.mahout.cf.examples -import org.apache.mahout.sparkbindings.drm._ -import org.apache.mahout.sparkbindings._ import scala.io.Source -import org.apache.mahout.math.SparseMatrix -import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math._ +import scalabindings._ import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ + import org.apache.mahout.cf.CooccurrenceAnalysis._ import scala.collection.JavaConversions._ @@ -78,7 +80,7 @@ object RunCrossCooccurrenceAnalysisOnEpinions { System.setProperty("spark.kryo.referenceTracking", "false") System.setProperty("spark.kryoserializer.buffer.mb", "100") - implicit val sc = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", + implicit val distributedContext = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", customJars = Traversable.empty[String]) val drmEpinionsRatings = drmParallelize(epinionsRatings, numPartitions = 2) @@ -92,7 +94,7 @@ object RunCrossCooccurrenceAnalysisOnEpinions { RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1), "/tmp/co-occurrence-on-epinions/indicators-trust-item/") - sc.stop() + distributedContext.close() println("Saved indicators to /tmp/co-occurrence-on-epinions/") }