From 107a0ba9605241653a85b113661a8fa5c055529f Mon Sep 17 00:00:00 2001 From: pferrel Date: Wed, 4 Jun 2014 12:54:22 -0700 Subject: [PATCH 1/7] added Sebastian's CooccurrenceAnalysis patch updated it to use current Mahout-DSL --- .gitignore | 1 + spark/src/.DS_Store | Bin 0 -> 6148 bytes spark/src/main/.DS_Store | Bin 0 -> 6148 bytes spark/src/main/scala/.DS_Store | Bin 0 -> 6148 bytes spark/src/main/scala/org/.DS_Store | Bin 0 -> 6148 bytes spark/src/main/scala/org/apache/.DS_Store | Bin 0 -> 6148 bytes .../mahout/cf/CooccurrenceAnalysis.scala | 210 ++++++++++++++++++ .../mahout/cf/examples/Recommendations.scala | 169 ++++++++++++++ 8 files changed, 380 insertions(+) create mode 100644 spark/src/.DS_Store create mode 100644 spark/src/main/.DS_Store create mode 100644 spark/src/main/scala/.DS_Store create mode 100644 spark/src/main/scala/org/.DS_Store create mode 100644 spark/src/main/scala/org/apache/.DS_Store create mode 100644 spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala create mode 100644 spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala diff --git a/.gitignore b/.gitignore index c47bff17ce..f5003759e9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ output-asf-email-examples/ .project .settings/ .idea/ +.DS_Store *.iml target/ examples/bin/tmp diff --git a/spark/src/.DS_Store b/spark/src/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..7b0d36729e2ee777a660f9e8c6709dd97bc2fb68 GIT binary patch literal 6148 zcmeH~F>V4u3`M`g7D#EfOgRk)$PGpaPQV2S&>%r5BKkQx-ySzvsH0W%E!l7ES!;KH zv9ktX>-*^w7y&HlPOLmk%$N_j;tOYdpMH*)!|itQBJHgMp3+B5_H$d10#ZNMM4T9irw zDe%t}uwk>?toc%Twm!X{*Y{cVb)%DUIm54?049DEf6&9YUwlEvIYIqf#4$mI7itHYo8^+Vg;}!TM!kPMk^St>SKu2 zy&WuhT}`%Nw2S8Op?PPuDF&v|E?SVlv^p5502LT1&_&+c`M-yMoBu~GOsN1B_%j7` zzB}x;c&R*FKVHx3$E@1A!9l+q;q4~?i5fKY%m813XGbB*6r9*;8fO-L>F+DnJEpT><+(6u4ncTmt>mfx%k<;0j?k%)OTY z77GAt;u44mOoIvxs%DF!K}Wn~UQJvAgD#rQhvvHE{`aI^s?T@@K$wp;3YVR^S&eHWjb{ literal 0 HcmV?d00001 diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala new file mode 100644 index 0000000000..5df329b3be --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ + +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +// import scala.collection.parallel.mutable +import org.apache.mahout.common.RandomUtils + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2 } + + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { + + implicit val disributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions).checkpoint() + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.colSums) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + var indicatorMatrices = List(drmIndicatorsAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.colSums) + + // Compute cross-co-occurrence matrix B'A + val drmBtA = drmB.t %*% drmA + + val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerThingB, bcastInteractionsPerItemA) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA + + drmB.uncache() + } + + // Unpin downsampled interaction matrix + drmA.uncache() + + // Return list of indicator matrices + indicatorMatrices + } + + /** + * Compute loglikelihood ratio + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details + **/ + def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, + numInteractionsWithAandB: Long, numInteractions: Long) = { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + } + + def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, + bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], + crossCooccurrence: Boolean = true) = { + drmBtA.mapBlock() { + case (keys, block) => + + val llrBlock = block.like() + val numInteractionsB: Vector = bcastNumInteractionsB + val numInteractionsA: Vector = bcastNumInteractionsA + + for (index <- 0 until keys.size) { + + val thingB = keys(index) + + // PriorityQueue to select the top-k items + val topItemsPerThing = new mutable.PriorityQueue[(Int,Double)]()(orderByScore) + + block(index, ::).nonZeroes().foreach { elem => + val thingA = elem.index + val cooccurrences = elem.get + + // exclude co-occurrences of the item with itself + if (crossCooccurrence || thingB != thingA) { + // Compute loglikelihood ratio + val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, + cooccurrences.toLong, numUsers) + val candidate = thingA -> llrRatio + + // Enqueue item with score, if belonging to the top-k + if (topItemsPerThing.size < maxInterestingItemsPerThing) { + topItemsPerThing.enqueue(candidate) + } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + } + } + } + + // Add top-k interesting items to the output matrix + topItemsPerThing.dequeueAll.foreach { case (otherThing, llrScore) => llrBlock(index, otherThing) = llrScore } + } + + keys -> llrBlock + } + } + + /** + * Selectively downsample users and things with an anormalous amount of interactions, inspired by + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java + * + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not + */ + def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { + + implicit val distributedContext = drmM.context + + // Pin raw interaction matrix + val drmI = drmM.checkpoint() + + // Broadcast vector containing the number of interactions with each thing + val bcastNumInteractions = drmBroadcast(drmI.colSums) + + val downSampledDrmI = drmI.mapBlock() { + case (keys, block) => + val numInteractions: Vector = bcastNumInteractions + + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) + + val downsampledBlock = block.like() + + // Downsample the interaction vector of each user + for (userIndex <- 0 until keys.size) { + + val interactionsOfUser = block(userIndex, ::) + val numInteractionsOfUser = interactionsOfUser.sum + + val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser + + interactionsOfUser.nonZeroes().foreach { elem => + val numInteractionsWithThing = numInteractions(elem.index) + val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing + + if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) { + // We ignore the original interaction value and create a binary 0-1 matrix + // as we only consider whether interactions happened or did not happen + downsampledBlock(userIndex, elem.index) = 1 + } + } + } + + keys -> downsampledBlock + } + + // Unpin raw interaction matrix + drmI.uncache() + + downSampledDrmI + } +} diff --git a/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala new file mode 100644 index 0000000000..c640e1ef16 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.examples + +import scala.io.Source +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ + +import org.apache.mahout.cf.CooccurrenceAnalysis._ +import scala.collection.JavaConversions._ + +/** + * The Epinions dataset contains ratings from users to items and a trust-network between the users. + * We use co-occurrence analysis to compute "users who like these items, also like that items" and + * "users who trust these users, like that items" + * + * Download and unpack the dataset files from: + * + * http://www.trustlet.org/datasets/downloaded_epinions/ratings_data.txt.bz2 + * http://www.trustlet.org/datasets/downloaded_epinions/trust_data.txt.bz2 + **/ +object RunCrossCooccurrenceAnalysisOnEpinions { + + def main(args: Array[String]): Unit = { + + if (args.length == 0) { + println("Usage: RunCooccurrenceAnalysisOnMovielens1M ") + println("Download the dataset from http://www.trustlet.org/datasets/downloaded_epinions/ratings_data.txt.bz2 and") + println("http://www.trustlet.org/datasets/downloaded_epinions/trust_data.txt.bz2") + sys.exit(-1) + } + + val datasetDir = args(0) + + val epinionsRatings = new SparseMatrix(49290, 139738) + + var firstLineSkipped = false + for (line <- Source.fromFile(datasetDir + "/ratings_data.txt").getLines()) { + if (line.contains(' ') && firstLineSkipped) { + val tokens = line.split(' ') + val userID = tokens(0).toInt - 1 + val itemID = tokens(1).toInt - 1 + val rating = tokens(2).toDouble + epinionsRatings(userID, itemID) = rating + } + firstLineSkipped = true + } + + val epinionsTrustNetwork = new SparseMatrix(49290, 49290) + firstLineSkipped = false + for (line <- Source.fromFile(datasetDir + "/trust_data.txt").getLines()) { + if (line.contains(' ') && firstLineSkipped) { + val tokens = line.trim.split(' ') + val userID = tokens(0).toInt - 1 + val trustedUserId = tokens(1).toInt - 1 + epinionsTrustNetwork(userID, trustedUserId) = 1 + } + firstLineSkipped = true + } + + System.setProperty("spark.kryo.referenceTracking", "false") + System.setProperty("spark.kryoserializer.buffer.mb", "100") + +// implicit val distributedContext = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", +// customJars = Traversable.empty[String]) + implicit val distributedContext = mahoutSparkContext(masterUrl = "spark://occam4:7077", appName = "MahoutClusteredContext", + customJars = Traversable.empty[String]) + + val drmEpinionsRatings = drmParallelize(epinionsRatings, numPartitions = 2) + val drmEpinionsTrustNetwork = drmParallelize(epinionsTrustNetwork, numPartitions = 2) + + val indicatorMatrices = cooccurrences(drmEpinionsRatings, randomSeed = 0xdeadbeef, + maxInterestingItemsPerThing = 100, maxNumInteractions = 500, Array(drmEpinionsTrustNetwork)) +//hdfs://occam4:54310/user/pat/xrsj/ +/* + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0), + "/tmp/co-occurrence-on-epinions/indicators-item-item/") + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1), + "/tmp/co-occurrence-on-epinions/indicators-trust-item/") +*/ + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0), + "hdfs://occam4:54310/user/pat/xrsj/indicators-item-item/") + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1), + "hdfs://occam4:54310/user/pat/xrsj/indicators-trust-item/") + + distributedContext.close() + + println("Saved indicators to /tmp/co-occurrence-on-epinions/") + } +} + +/** + * The movielens1M dataset contains movie ratings, we use co-occurrence analysis to compute + * "users who like these movies, also like that movies" + * + * Download and unpack the dataset files from: + * http://files.grouplens.org/datasets/movielens/ml-1m.zip + */ +object RunCooccurrenceAnalysisOnMovielens1M { + + def main(args: Array[String]): Unit = { + + if (args.length == 0) { + println("Usage: RunCooccurrenceAnalysisOnMovielens1M ") + println("Download the dataset from http://files.grouplens.org/datasets/movielens/ml-1m.zip") + sys.exit(-1) + } + + val datasetDir = args(0) + + System.setProperty("spark.kryo.referenceTracking", "false") + System.setProperty("spark.kryoserializer.buffer.mb", "100") + + implicit val sc = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", + customJars = Traversable.empty[String]) + + System.setProperty("mahout.math.AtA.maxInMemNCol", 4000.toString) + + val movielens = new SparseMatrix(6040, 3952) + + for (line <- Source.fromFile(datasetDir + "/ratings.dat").getLines()) { + val tokens = line.split("::") + val userID = tokens(0).toInt - 1 + val itemID = tokens(1).toInt - 1 + val rating = tokens(2).toDouble + movielens(userID, itemID) = rating + } + + val drmMovielens = drmParallelize(movielens, numPartitions = 2) + + val indicatorMatrix = cooccurrences(drmMovielens).head + + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrix, + "/tmp/co-occurrence-on-movielens/indicators-item-item/") + + sc.stop() + + println("Saved indicators to /tmp/co-occurrence-on-movielens/") + } +} + +object RecommendationExamplesHelper { + + def saveIndicatorMatrix(indicatorMatrix: DrmLike[Int], path: String) = { + indicatorMatrix.rdd.flatMap({ case (thingID, itemVector) => + for (elem <- itemVector.nonZeroes()) yield { thingID + '\t' + elem.index } + }) + .saveAsTextFile(path) + } +} From 74b9921c4c9bd8903585bbd74d9e66298ea8b7a0 Mon Sep 17 00:00:00 2001 From: pferrel Date: Wed, 4 Jun 2014 13:09:07 -0700 Subject: [PATCH 2/7] Adding stuff for itemsimilarity driver for Spark --- spark/src/.DS_Store | Bin 6148 -> 0 bytes .../apache/mahout/drivers/FileSysUtils.scala | 34 +++ .../mahout/drivers/IndexedDataset.scala | 54 +++++ .../drivers/IndexedDatasetDriverTest.scala | 164 ++++++++++++++ .../mahout/drivers/ItemSimilarityDriver.scala | 208 ++++++++++++++++++ .../apache/mahout/drivers/MahoutDriver.scala | 74 +++++++ .../mahout/drivers/MahoutOptionParser.scala | 24 ++ .../apache/mahout/drivers/ReaderWriter.scala | 188 ++++++++++++++++ .../org/apache/mahout/drivers/Schema.scala | 30 +++ .../mahout/drivers/IndexedDatasetTest.scala | 25 +++ .../drivers/ItemSimilarityDriver$Test.scala | 180 +++++++++++++++ 11 files changed, 981 insertions(+) delete mode 100644 spark/src/.DS_Store create mode 100644 spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala create mode 100644 spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala create mode 100644 spark/src/main/scala/org/apache/mahout/drivers/IndexedDatasetDriverTest.scala create mode 100644 spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala create mode 100644 spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala create mode 100644 spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala create mode 100644 spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala create mode 100644 spark/src/main/scala/org/apache/mahout/drivers/Schema.scala create mode 100644 spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala create mode 100644 spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala diff --git a/spark/src/.DS_Store b/spark/src/.DS_Store deleted file mode 100644 index 7b0d36729e2ee777a660f9e8c6709dd97bc2fb68..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeH~F>V4u3`M`g7D#EfOgRk)$PGpaPQV2S&>%r5BKkQx-ySzvsH0W%E!l7ES!;KH zv9ktX>-*^w7y&HlPOLmk%$N_j;tOYdpMH*)!|itQBJHgMp3+B5_H$d10#ZNMM4T9irw zDe%t}uwk>?toc%Twm!X{*Y{cVb)%DUIm54?049DEf6&9YUwlE + options.copy(input = x) + } text ("Path for input. It may be a filename or directory name and can be a local file path or an HDFS URI (required).") + opt[String]('o', "output") required() action { (x, options) => + options.copy(output = x) + } text ("Output will be in sub-directories stored here so this must be a directory path (required).") + note("\nAlgorithm control options:") + opt[String]("master") abbr ("ma") text ("URL for the Spark Master. (optional). Default: 'local'") action { (x, options) => + options.copy(master = x) + } + opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) => + options.copy(maxPrefs = x) + } text ("Max number of preferences to consider per user or item, users or items with more preferences will be sampled down (optional). Default: 500") validate { x => + if (x > 0) success else failure("Option --maxPrefs must be > 0") + } + opt[Int]("minPrefs") abbr ("mp") action { (x, options) => + options.copy(minPrefs = x) + } text ("Ignore users with less preferences than this (optional). Default: 1") validate { x => + if (x > 0) success else failure("Option --minPrefs must be > 0") + } + + opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) => + options.copy(maxSimilaritiesPerItem = x) + } text ("Try to cap the number of similar items for each item to this number (optional). Default: 100") validate { x => + if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0") + } + opt[Int]("randomSeed") abbr ("rs") action { (x, options) => + options.copy(randomSeed = x) + } text ("Int to seed random number generator (optional). Default: Uses time to generate a seed") validate { x => + if (x > 0) success else failure("Option --randomSeed must be > 0") + } + note("\nInput text file schema options:") + opt[String]("inDelim") abbr ("d") text ("Input delimiter character (optional). Default: '\\t'") action { (x, options) => + options.copy(inDelim = x) + } + opt[String]("filter1") abbr ("f1") action { (x, options) => + options.copy(filter1 = x) + } text ("String whose presence indicates a datum for the primary item set, can be a regex (optional). Default: no filtered is applied, all is used") + opt[String]("filter2") abbr ("f2") action { (x, options) => + options.copy(filter2 = x) + } text ("String whose presence indicates a datum for the secondary item set, can be a regex (optional). Used in cross-cooccurrence. Default: no secondary filter is applied") + opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) => + options.copy(rowIDPosition = x) + } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x => + if (x >= 0) success else failure("Option --rowIDColNum must be >= 0") + } + opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) => + options.copy(itemIDPosition = x) + } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x => + if (x >= 0) success else failure("Option --itemIDColNum must be >= 0") + } + opt[Int]("filterPosition") abbr ("fc") action { (x, options) => + options.copy(filterPosition = x) + } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x => + if (x >= -1) success else failure("Option --filterColNum must be >= -1") + } + note("\nDefault input schema will accept: 'userIDitemId' or 'userIDitemIDany-text...' and all rows will be used") + note("\nFile input options:") + opt[Unit]('r', "recursive") action { (_, options) => + options.copy(recursive = true) + } text ("The input path should be searched recursively for files that match the filename pattern from -fp (optional), Default: false") + opt[String]("filenamePattern") abbr ("fp") action { (x, options) => + options.copy(filenamePattern = x) + } text ("Regex to match in determining input files (optional). Default: filename in the --input option or '^part-.*' if --input is a directory") + note("\nOutput text file schema options:") + opt[String]("outDelim1") abbr ("od1") action { (x, options) => + options.copy(outDelim1 = x) + } text ("Primary output inDelim value, used to separate row IDs from the similar items list (optional). Default: '\\t'") + opt[String]("outDelim2") abbr ("od2") action { (x, options) => + options.copy(outDelim2 = x) + } text ("Secondary output inDelim value, used to separate item IDs from their values in the similar items list (optional). Default: ':'") + opt[String]("outDelim3") abbr ("od3") action { (x, options) => + options.copy(outDelim3 = x) + } text ("Last inDelim value, used to separate (itemID:value) tuples in the similar items list. (optional). Default: ','") + note("\nDefault delimiters will produce output of the form: 'itemID1>itemID2:value2,itemID10:value10...'") + note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.\n") + help("help") abbr ("h") text ("prints this usage text\n") + checkConfig { c => + if (c.filterPosition == c.itemIDPosition || c.filterPosition == c.rowIDPosition || c.rowIDPosition == c.itemIDPosition) failure("The row, item, and filter positions must be unique.") else success + } + checkConfig { c => + if (c.filter1 != null && c.filter2 != null && c.filter1 == c.filter2) failure("If using filters they must be unique.") else success + } + } + parser.parse(args, Options()) map { opts => + options = opts + process + } + } + + private def readIndexedDatasets: Array[IndexedDataset] = { + val inFiles = FileSysUtils(options.input, options.filenamePattern, options.recursive ).uris + if(inFiles.isEmpty){Array()}else{ + val indexedDataset1 = IndexedDataset(readStore1.readFrom(inFiles)) + if (options.filterPosition != -1 && options.filter2 != null) { + val indexedDataset2 = IndexedDataset(readStore2.readFrom(inFiles)) + Array(indexedDataset1, indexedDataset2) + } else { + Array(indexedDataset1) + } + } + } + + override def start(masterUrl: String = options.master, appName: String = options.appName, + customJars:Traversable[String] = Traversable.empty[String]): Unit = { + //todo: create and modify a SparkContext here, which can be passed in to mahoutSparkContext in the super.start + System.setProperty("spark.kryo.referenceTracking", "false") + System.setProperty("spark.kryoserializer.buffer.mb", "100") + System.setProperty("spark.executor.memory", "2g") + super.start(masterUrl, appName, customJars) + val readSchema1 = new Schema("delim" -> options.inDelim, "filter" -> options.filter1, "rowIDPosition" -> options.rowIDPosition, "columnIDPosition" -> options.itemIDPosition, "filterPosition" -> options.filterPosition) + if(options.filterPosition != -1 && options.filter2 != null) { + val readSchema2 = new Schema("delim" -> options.inDelim, "filter" -> options.filter2, "rowIDPosition" -> options.rowIDPosition, "columnIDPosition" -> options.itemIDPosition, "filterPosition" -> options.filterPosition) + readStore2 = new TextDelimitedIndexedDatasetReader(readSchema2, mc) + } + val writeSchema = new Schema("delim1" -> options.outDelim1, "delim2" -> options.outDelim2, "delim3" -> options.outDelim3) + readStore1 = new TextDelimitedIndexedDatasetReader(readSchema1, mc) + writeStore = new TextDelimitedIndexedDatasetWriter(writeSchema, mc) + } + + override def process: Unit = { + start() + + val indexedDatasets = readIndexedDatasets + + val indicatorMatrices = CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs, Array(indexedDatasets(1).matrix)) + + val selfIndicatorDataset = new IndexedDataset(indicatorMatrices(0), indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs) // self similarity + + writeStore.writeTo(selfIndicatorDataset, options.output+"indicator-matrix") + if(indexedDatasets.length > 1){ + val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity + writeStore.writeTo(crossIndicatorDataset, options.output+"cross-indicator-matrix") + } + + stop + } + + //Default values go here, any '_' or null should be 'required' in the Parser or flags an unused option + // todo: support two input streams for cross-similarity, maybe assume one schema for both inputs + case class Options( + master: String = "local", + appName: String = "ItemSimilarityJob", + randomSeed: Int = System.currentTimeMillis().toInt, + recursive: Boolean = false, + input: String = null, + output: String = null, + filenamePattern: String = "^part-.*", + maxSimilaritiesPerItem: Int = 100, + maxPrefs: Int = 500, + minPrefs: Int = 1, + rowIDPosition: Int = 0, + itemIDPosition: Int = 1, + filterPosition: Int = -1, + filter1: String = null, + filter2: String = null, + inDelim: String = ",", + outDelim1: String = "\t", + outDelim2: String = ":", + outDelim3: String = "," + ) + +} diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala new file mode 100644 index 0000000000..b8a79f8cac --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +import org.apache.spark.SparkContext +import org.apache.mahout.sparkbindings._ + +/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main. Also define a command line parser and default options or fill in the following template: + * {{{ + * object SomeDriver extends MahoutDriver { + * override def main(args: Array[String]): Unit = { + * val parser = new MahoutOptionParser[Options]("Job Name") { + * head("Job Name", "Spark") + * note("Various CLI options") + * //see https://github.com/scopt/scopt for a good Scala option parser, which MahoutOptionParser extends + * } + * parser.parse(args, Options()) map { opts => + * options = opts + * process + * } + * } + * + * override def process: Unit = { + * start() + * //don't just stand there do something + * stop + * } + * + * //Default values go here, any '_' or null should be 'required' in the Parser or flags an unused option + * case class Options( + * appName: String = "Job Name", ... + * ) + * } + * }}} + */ +abstract class MahoutDriver { + protected var mc: SparkContext = _ + /** Creates a Spark context to run the job inside. + * Creates a Spark context to run the job inside. Override to set the SparkConf values specific to the job, these must be set before the context is created. + * @param masterUrl Spark master URL + * @param appName Name to display in Spark UI + * @param customJars List of paths to custom jars + * */ + protected def start(masterUrl: String, appName: String, + customJars:Traversable[String] = Traversable.empty[String]) : Unit = { + mc = mahoutSparkContext(masterUrl, appName, customJars) + } + + /** Override (optionally) for special cleanup */ + protected def stop: Unit = { + mc.stop + } + + /** This is wher you do the work, call start first, then before exiting call stop */ + protected def process: Unit + + /** Parse command line and call process*/ + def main(args: Array[String]): Unit +} diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala new file mode 100644 index 0000000000..8a337f50a6 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.drivers + +import scopt.OptionParser + +/** Modifies default [[scopt.OptionParser]] to output long help-like usage + error message */ +class MahoutOptionParser[C](programName: String) extends OptionParser[C](programName: String) { + override def showUsageOnError = true +} diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala new file mode 100644 index 0000000000..1179eef2ee --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +import org.apache.mahout.sparkbindings.drm._ +import org.apache.spark.SparkContext._ +import org.apache.mahout.math.RandomAccessSparseVector +import org.apache.spark.SparkContext +import com.google.common.collect.{BiMap, HashBiMap} +import scala.collection.JavaConversions._ +import org.apache.mahout.sparkbindings.DrmRdd +import org.apache.mahout.math.drm.DrmLike + + +/** Reader trait is abstract in the sense that the reader function must be defined by an extending trait, which also defines the type to be read. + * @tparam T type of object read, usually supplied by an extending trait. + * @todo the reader need not create both dictionaries but does at present. There are cases where one or the other dictionary is never used so saving the memory for a very large dictionary may be worth the optimization to specify which dictionaries are created. + */ +trait Reader[T]{ + val sc: SparkContext + val readSchema: Schema + protected def reader(sc: SparkContext, readSchema: Schema, source: String): T + def readFrom(source: String): T = reader(sc, readSchema, source) +} + +/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written. + * @tparam T + */ +trait Writer[T]{ + val sc: SparkContext + val writeSchema: Schema + protected def writer(sc: SparkContext, writeSchema: Schema, dest: String, collection: T): Unit + def writeTo(collection: T, dest: String) = writer(sc, writeSchema, dest, collection) +} + +/** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]] + */ +trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ + /** Read in text delimited tuples from all URIs in this comma delimited source String. + * + * @param sc context for the Spark job + * @param readSchema describes the delimiters and positions of values in the text delimited file. + * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]] + * @return + */ + protected def reader(sc: SparkContext, readSchema: Schema, source: String): IndexedDataset = { + try { + val delimiter = readSchema("delim").asInstanceOf[String] + val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int] + val columnIDPosition = readSchema("columnIDPosition").asInstanceOf[Int] + val filterPosition = readSchema("filterPosition").asInstanceOf[Int] + val filterBy = readSchema("filter").asInstanceOf[String] + //instance vars must be put into locally scoped vals when used in closures that are + //executed but Spark + + assert(!source.isEmpty, { + println(this.getClass.toString + ": has no files to read") + throw new IllegalArgumentException + }) + + var columns = sc.textFile(source).map({ line => line.split(delimiter)}) + + columns = columns.filter({ tokens => tokens(filterPosition) == filterBy}) + + val interactions = columns.map({ tokens => tokens(rowIDPosition) -> tokens(columnIDPosition)}) + + interactions.cache() + + val rowIDs = interactions.map({ case (rowID, _) => rowID}).distinct().collect() + val columnIDs = interactions.map({ case (_, columnID) => columnID}).distinct().collect() + + val numRows = rowIDs.size + val numColumns = columnIDs.size + + val rowIDDictionary = asOrderedDictionary(rowIDs) + val columnIDDictionary = asOrderedDictionary(columnIDs) + + val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary) + val columnIDDictionary_bcast = sc.broadcast(columnIDDictionary) + + val indexedInteractions = + interactions.map({ case (rowID, columnID) => + val rowIndex = rowIDDictionary_bcast.value.get(rowID).get + val columnIndex = columnIDDictionary_bcast.value.get(columnID).get + + rowIndex -> columnIndex + }).groupByKey().map({ case (rowIndex, columnIndexes) => + val row = new RandomAccessSparseVector(numColumns) + for (columnIndex <- columnIndexes) { + row.setQuick(columnIndex, 1.0) + } + rowIndex -> row + }).asInstanceOf[DrmRdd[Int]] + + //todo: old API, val drmInteractions = new CheckpointedDrmBase[Int](indexedInteractions, numRows, numColumns) + val drmInteractions = new CheckpointedDrmSpark[Int](indexedInteractions, numRows, numColumns) + + IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary) + + } catch { + case cce: ClassCastException => { + println(this.getClass.toString + ": Schema has illegal values"); throw cce + } + } + } + + private def asOrderedDictionary(entries: Array[String]): BiMap[String, Int] = { + var dictionary: BiMap[String, Int] = HashBiMap.create() + var index = 0 + for (entry <- entries) { + dictionary.forcePut(entry, index) + index += 1 + } + dictionary + } +} + +trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ + /** Read in text delimited tuples from all URIs in this comma delimited source String. + * + * @param sc context for the Spark job + * @param writeSchema describes the delimiters and positions of values in the output text delimited file. + * @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]] + */ + protected def writer(sc: SparkContext, writeSchema: Schema, dest: String, indexedDataset: IndexedDataset): Unit = { + try { + val outDelim1 = writeSchema("delim1").asInstanceOf[String] + val outDelim2 = writeSchema("delim2").asInstanceOf[String] + val outDelim3 = writeSchema("delim3").asInstanceOf[String] + //instance vars must be put into locally scoped vals when put into closures that are + //executed but Spark + assert (indexedDataset != null, {println(this.getClass.toString+": has no indexedDataset to write"); throw new IllegalArgumentException }) + assert (!dest.isEmpty, {println(this.getClass.toString+": has no destination or indextedDataset to write"); throw new IllegalArgumentException}) + val matrix: DrmLike[Int] = indexedDataset.matrix + val rowIDDictionary: BiMap[String, Int] = indexedDataset.rowIDs + val columnIDDictionary: BiMap[String, Int] = indexedDataset.columnIDs + matrix.rdd.map({ case (rowID, itemVector) => + var line: String = rowIDDictionary.inverse.get(rowID) + outDelim1 + for (item <- itemVector.nonZeroes()) { + line += columnIDDictionary.inverse.get(item.index) + outDelim2 + item.get + outDelim3 + } + line.dropRight(1) + }) + .saveAsTextFile(dest) + }catch{ + case cce: ClassCastException => {println(this.getClass.toString+": Schema has illegal values"); throw cce} + } + } +} + +/** A combined trait that reads and writes */ +trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexedDatasetWriter + +/** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. + * @param readSchema describes the delimiters and position of values in the text delimited file to be read. + * @param sc Spark context for reading files + * @note The source files are supplied to the readFrom trait method. + * */ +class TextDelimitedIndexedDatasetReader(val readSchema: Schema, val sc: SparkContext) extends TDIndexedDatasetReader + +/** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. + * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. + * @param sc Spark context for reading files + * @note the destination if supplied to the writeTo trait method + * */ +class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sc: SparkContext) extends TDIndexedDatasetWriter + +/** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor. + * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read. + * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. + * @param sc Spark context for reading the files + * */ +class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema, val sc: SparkContext) extends TDIndexedDatasetReaderWriter diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala new file mode 100644 index 0000000000..13fa292b92 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala @@ -0,0 +1,30 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +import scala.collection.mutable.HashMap + +/** Syntactic sugar for HashMap[String, Any] + * + * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}} + */ +class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] { + //todo: this require a mutable HashMap, do we care? + this ++= params +} diff --git a/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala b/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala new file mode 100644 index 0000000000..fb13361090 --- /dev/null +++ b/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala @@ -0,0 +1,25 @@ +package org.apache.mahout.drivers + +import org.scalatest.FunSuite + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +class IndexedDatasetTest extends FunSuite { + //todo: put some tests here! + +} diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala new file mode 100644 index 0000000000..c1ae4ab5a3 --- /dev/null +++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala @@ -0,0 +1,180 @@ +package org.apache.mahout.drivers + +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.mahout.sparkbindings._ +import java.io.{FileWriter, BufferedWriter} +import com.google.common.io.Closeables +import org.apache.spark.SparkContext + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +class ItemSimilarityDriver$Test extends FunSuite { + private var sc: SparkContext = _ + + + test ("Running some text delimited files through the import/cooccurrence/export"){ + + val exampleCsvlogStatements = Array( + "12569537329,user1,item1,\"view\"", + "12569537329,user1,item2,\"view\"", + "12569537329,user1,item2,\"like\"", + "12569537329,user1,item3,\"like\"", + "12569537329,user2,item2,\"view\"", + "12569537329,user2,item2,\"like\"", + "12569537329,user3,item1,\"like\"", + "12569537329,user3,item3,\"view\"", + "12569537329,user3,item3,\"like\"" + ) + + val exampleTsvLogStatements = Array( + "12569537329\tuser1\titem1\t\"view\"", + "12569537329\tuser1\titem2\t\"view\"", + "12569537329\tuser1\titem2\t\"like\"", + "12569537329\tuser1\titem3\t\"like\"", + "12569537329\tuser2\titem2\t\"view\"", + "12569537329\tuser2\titem2\t\"like\"", + "12569537329\tuser3\titem1\t\"like\"", + "12569537329\tuser3\titem3\t\"view\"", + "12569537329\tuser3\titem3\t\"like\"" + ) + + + val csvLogStatements = Array( + "u1,purchase,iphone", + "u1,purchase,ipad", + "u2,purchase,nexus", + "u2,purchase,galaxy", + "u3,purchase,surface", + "u4,purchase,iphone", + "u4,purchase,galaxy", + "u1,view,iphone", + "u1,view,ipad", + "u1,view,nexus", + "u1,view,galaxy", + "u2,view,iphone", + "u2,view,ipad", + "u2,view,nexus", + "u2,view,galaxy", + "u3,view,surface", + "u3,view,nexus", + "u4,view,iphone", + "u4,view,ipad", + "u4,view,galaxy" + ) + + val tsvLogStatements = Array( + "u1\tpurchase\tiphone", + "u1\tpurchase\tipad", + "u2\tpurchase\tnexus", + "u2\tpurchase\tgalaxy", + "u3\tpurchase\tsurface", + "u4\tpurchase\tiphone", + "u4\tpurchase\tgalaxy", + "u1\tview\tiphone", + "u1\tview\tipad", + "u1\tview\tnexus", + "u1\tview\tgalaxy", + "u2\tview\tiphone", + "u2\tview\tipad", + "u2\tview\tnexus", + "u2\tview\tgalaxy", + "u3\tview\tsurface", + "u3\tview\tnexus", + "u4\tview\tiphone", + "u4\tview\tipad", + "u4\tview\tgalaxy" + ) + + var w: BufferedWriter = null + //try { + w = new BufferedWriter(new FileWriter("/tmp/cf-data.txt")) + w.write(csvLogStatements.mkString("\n")) + //} finally { + Closeables.close(w, false) + //} + /* + val indexedLikes = IndexedDatasetStore.readTuples(sc, "tmp/cf-data.txt", 2, ",", 0, 2, 1, "purchase") + + val indexedViews = IndexedDatasetStore.readTuples(sc, "/tmp/cf-data.txt", 2, ",", 0, 2, 1, "view") + + val drmLikes = indexedLikes.matrix + val drmViews = indexedViews.matrix + + // Now we could run cooccurrence analysis using the DRMs, instead we'll just fetch and print the matrices + val drmXCooccurrences = cooccurrences(drmLikes, randomSeed = 0xdeadbeef, + maxInterestingItemsPerThing = 100, maxNumInteractions = 500, Array(drmViews)) + + val inCoreViews = drmViews.collect + val inCoreLikes = drmLikes.collect + val inCoreIndicator = drmXCooccurrences(0).collect + val inCoreXIndicator = drmXCooccurrences(1).collect + println("\nLIKES:") + println(inCoreLikes) + println("\nVIEWS:") + println(inCoreViews) + println("\nINDICATOR MATRIX") + println(inCoreIndicator) + println("\nCROSS INDICATOR MATRIX") + println(inCoreXIndicator) + */ + + /* + //Clustered Spark and HDFS + ItemSimilarityDriver.main(Array( + "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt", + "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/", + "--master", "spark://occam4:7077", + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1" + )) + */ + //local multi-threaded Spark with HDFS using large dataset + /* ItemSimilarityDriver.main(Array( + "--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt", + "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/", + "--master", "local[4]", + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1" + )) + */ + + //local multi-threaded Spark with local FS + ItemSimilarityDriver.main(Array( + "--input", "/tmp/cf-data.txt", + "--output", "tmp/indicatorMatrices/", + "--master", "local[4]", + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1" + )) + + } + +} From a59265931ed3a51ba81e1a0a7171ebb102be4fa4 Mon Sep 17 00:00:00 2001 From: pferrel Date: Wed, 4 Jun 2014 13:13:13 -0700 Subject: [PATCH 3/7] added scopt to pom deps --- spark/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/spark/pom.xml b/spark/pom.xml index ac99ffd936..1f08d9a1ed 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -288,6 +288,13 @@ + + + com.github.scopt + scopt_2.10 + 3.2.0 + + org.apache.spark From 16c03f7fa73c156859d1dba3a333ef9e8bf922b0 Mon Sep 17 00:00:00 2001 From: pferrel Date: Wed, 4 Jun 2014 14:32:18 -0700 Subject: [PATCH 4/7] added Sebastian's MurmurHash changes Signed-off-by: pferrel --- .../java/org/apache/mahout/math/MurmurHash.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/math/src/main/java/org/apache/mahout/math/MurmurHash.java b/math/src/main/java/org/apache/mahout/math/MurmurHash.java index 0b3fab0675..32dfdd614e 100644 --- a/math/src/main/java/org/apache/mahout/math/MurmurHash.java +++ b/math/src/main/java/org/apache/mahout/math/MurmurHash.java @@ -17,6 +17,8 @@ package org.apache.mahout.math; +import com.google.common.primitives.Ints; + import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -29,7 +31,16 @@ */ public final class MurmurHash { - private MurmurHash() { + private MurmurHash() {} + + /** + * Hashes an int. + * @param data The int to hash. + * @param seed The seed for the hash. + * @return The 32 bit hash of the bytes in question. + */ + public static int hash(int data, int seed) { + return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed); } /** From 2f87f5433f90fa2c49ef386ca245943e1fc73beb Mon Sep 17 00:00:00 2001 From: pferrel Date: Wed, 4 Jun 2014 18:44:16 -0700 Subject: [PATCH 5/7] MAHOUT-1541 still working on this, some refactoring in the DSL for abstracting away Spark has moved access to rddsno Jira is closed yet --- .../main/scala/org/apache/mahout/drivers/ReaderWriter.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala index 1179eef2ee..9201c81d65 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala @@ -149,6 +149,10 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ val matrix: DrmLike[Int] = indexedDataset.matrix val rowIDDictionary: BiMap[String, Int] = indexedDataset.rowIDs val columnIDDictionary: BiMap[String, Int] = indexedDataset.columnIDs + // below doesn't compile because the rdd is not in a CheckpointedDrmSpark also I don't know how to turn a + // CheckpointedDrmSpark[Int] into a DrmLike[Int], which I need to pass in the CooccurrenceAnalysis#cooccurrence + // This seems to be about the refacotring to abstract away from Spark but the Read and Write are Spark specific + // and the non-specific DrmLike is no longer attached to a CheckpointedDrmSpark, could be missing something though matrix.rdd.map({ case (rowID, itemVector) => var line: String = rowIDDictionary.inverse.get(rowID) + outDelim1 for (item <- itemVector.nonZeroes()) { From c6adaa44c80bba99d41600e260bbb1ad5c972e69 Mon Sep 17 00:00:00 2001 From: pferrel Date: Thu, 5 Jun 2014 09:52:23 -0700 Subject: [PATCH 6/7] MAHOUT-1464 import cleanup, minor changes to examples for running on Spark Cluster --- .../apache/mahout/cf/CooccurrenceAnalysis.scala | 2 -- .../mahout/cf/examples/Recommendations.scala | 17 ++++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala index 5df329b3be..1f399ec947 100644 --- a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala +++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala @@ -23,11 +23,9 @@ import RLikeOps._ import drm._ import RLikeDrmOps._ import org.apache.mahout.sparkbindings._ - import scala.collection.JavaConversions._ import org.apache.mahout.math.stats.LogLikelihood import collection._ -// import scala.collection.parallel.mutable import org.apache.mahout.common.RandomUtils diff --git a/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala index c640e1ef16..afd6701605 100644 --- a/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala +++ b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala @@ -79,29 +79,32 @@ object RunCrossCooccurrenceAnalysisOnEpinions { System.setProperty("spark.kryo.referenceTracking", "false") System.setProperty("spark.kryoserializer.buffer.mb", "100") +/* to run on local, can provide number of core by changing to local[4] */ + implicit val distributedContext = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", + customJars = Traversable.empty[String]) -// implicit val distributedContext = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", -// customJars = Traversable.empty[String]) + /* to run on a Spark cluster provide the Spark Master URL implicit val distributedContext = mahoutSparkContext(masterUrl = "spark://occam4:7077", appName = "MahoutClusteredContext", customJars = Traversable.empty[String]) - +*/ val drmEpinionsRatings = drmParallelize(epinionsRatings, numPartitions = 2) val drmEpinionsTrustNetwork = drmParallelize(epinionsTrustNetwork, numPartitions = 2) val indicatorMatrices = cooccurrences(drmEpinionsRatings, randomSeed = 0xdeadbeef, maxInterestingItemsPerThing = 100, maxNumInteractions = 500, Array(drmEpinionsTrustNetwork)) -//hdfs://occam4:54310/user/pat/xrsj/ -/* + +/* local storage */ RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0), "/tmp/co-occurrence-on-epinions/indicators-item-item/") RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1), "/tmp/co-occurrence-on-epinions/indicators-trust-item/") -*/ + +/* To run on HDFS put your path to the data here, example of fully qualified path on my cluster provided RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0), "hdfs://occam4:54310/user/pat/xrsj/indicators-item-item/") RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1), "hdfs://occam4:54310/user/pat/xrsj/indicators-trust-item/") - +*/ distributedContext.close() println("Saved indicators to /tmp/co-occurrence-on-epinions/") From a2f84dea3f32d3df3e98c61f085bc1fabd453551 Mon Sep 17 00:00:00 2001 From: pferrel Date: Sat, 7 Jun 2014 14:27:06 -0700 Subject: [PATCH 7/7] drmWrap seems to be the answer to the changed DrmLike interface. Code works again but more to do. --- .../main/scala/org/apache/mahout/drivers/IndexedDataset.scala | 4 ++-- .../main/scala/org/apache/mahout/drivers/ReaderWriter.scala | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala index 36bda9022e..496db886a6 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala @@ -18,7 +18,7 @@ package org.apache.mahout.drivers import com.google.common.collect.BiMap -import org.apache.mahout.math.drm.DrmLike +import org.apache.mahout.math.drm.{CheckpointedDrm, DrmLike} /** * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries. @@ -37,7 +37,7 @@ import org.apache.mahout.math.drm.DrmLike * ID to and from the ordinal Mahout Int ID. This one holds column labels */ -case class IndexedDataset(matrix: DrmLike[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) { +case class IndexedDataset(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) { } /** diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala index 9201c81d65..af01ff0127 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala @@ -25,6 +25,7 @@ import com.google.common.collect.{BiMap, HashBiMap} import scala.collection.JavaConversions._ import org.apache.mahout.sparkbindings.DrmRdd import org.apache.mahout.math.drm.DrmLike +import org.apache.mahout.sparkbindings._ /** Reader trait is abstract in the sense that the reader function must be defined by an extending trait, which also defines the type to be read. @@ -108,7 +109,8 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ }).asInstanceOf[DrmRdd[Int]] //todo: old API, val drmInteractions = new CheckpointedDrmBase[Int](indexedInteractions, numRows, numColumns) - val drmInteractions = new CheckpointedDrmSpark[Int](indexedInteractions, numRows, numColumns) + //val drmInteractions = new CheckpointedDrmSpark[Int](indexedInteractions, numRows, numColumns) + val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns) IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)