ceteri edited this page Jul 25, 2012 · 6 revisions
Clone this wiki locally

Cascading Sample Recommender

The goal for this project is to create a sample application in Cascading 2.0 which shows how to build a simple kind of social recommender.


The code repo includes a Gradle script for command line builds. To generate an IntelliJ project use:

gradle ideaModule

To build the sample app from the command line use:

gradle clean jar


Before running this sample app, you'll need to have a supported release of Apache Hadoop installed. Here's what was used to develop and test our example code:

$ hadoop version

Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you're running in standalone mode) and run the app in Apache Hadoop standalone from a command line:

rm -rf output
hadoop jar ./build/libs/recommender.jar data/en.stop data/tweets output/token output/similarity
more output/similarity/part-00000

To run this same app on the Elastic MapReduce service based on their command line interface:

$ elastic-mapreduce --create --name "Sample Recommender" \
  --jar s3n://temp.cascading.org/sample/recommender.jar \
  --arg s3n://temp.cascading.org/sample/en.stop \
  --arg s3n://temp.cascading.org/sample/tweets/ \
  --arg s3n://temp.cascading.org/sample/out/token \
  --arg s3n://temp.cascading.org/sample/out/similarity

An example of log captured from a successful build+run is at https://gist.github.com/2949834


The social recommender is based on using data from the Twitter firehose. The app recommends other Twitter users to follow who have similar interests in stocks/investing.

Sample data includes 500 tweets, which is enough to demonstrate how this sample app works. A compiled JAR may run on a laptop with this sample data, or it can also scale-out horizontally to run on a cluster with thousands of nodes and handle much larger data.

Execution steps for this implementation are:

  1. Take the sample Twitter data as the input source tap. The endpoint used for that tap (the command line argument data/tweets) could be replaced by a much larger data set.
  2. Clean-up and tokenize the text of tweets using a regex pattern. NB: if you use a different data source later, that regex pattern may need to be modified.
  3. Generate (user, token) pairs to construct a bipartite graph
  4. Apply a stop word list to filter out common words, offensive phrases, etc.
  5. Also create a sink tap to measure token frequency, which may be used for QA: improve the regex patterns, adjust stop words, etc.
  6. Find the pairs of users who have "interesting" tokens in common, and generate an inverted index as ((user1, user2), token)
  7. Calculate a similarity metric for each user pair which are known to be neighbors
  8. Apply thresholds on the similarity metric to filter out poor recommendations.
  9. Connect up all the pipes and taps into a cascade, then generate a flow diagram and run the app. Results for recommended users get stored in the similarityPath sink tap.

The first part of the program illustrates use of a "stream assertion", which is much like running a unit test on your data set. Later there's use of a Debug which traces the tuple stream on stdout. Both of these testing features can be turned off in production. Both are important features of Cascading which are not found in other frameworks based on Apache Hadoop.

When you run this app, it generates a Cascading flow diagram in the file dot/similarity.dot which can be read by OmniGraffle or Visio. The diagram shows how the Cascading workflows will be run as job steps in Apache Hadoop. An annotated version is provided as docs/similarity.graffle which shows how the physical plan of the Cascading flows overlays onto "map" and "reduce" tasks.


Our intent is to show how such an app might integrate multiple data sources, and potentially where to integrate other systems outside of Apache Hadoop.

Based on using the command line arguments below, the recommender results get stored in output/similarity/ as Apache Hadoop part files in TSV format. In practice, those would most likely get loaded into Redis or some another low latency key/value store for use in a production system.

Also check the output results in output/token/ for QA on how well the text clean-up is working, which additional stop words need to be filtered, etc.

Sample App Diagram

To take this example a few steps further, one could use additional taps to include more use cases and potential integrations:


One nuance to note about this code: recommender systems often use algorithms which require a cartesian product. Those don't scale well when coded naively as MapReduce jobs. This example shows how to use the pattern of an inverted index + join. There is still a cartesian product calculated, but it is based on using a CoGroup so that it scales well for this kind of data. Also, the potential number of pairs gets filtered prior to where the cross-product gets performed, which reduces the required processing.

For example, other recommenders might use a Cosine distance which implies use of a cartesian product. Instead this app calculates an Ochiai similarity metric. This shows one case of how to "unwind" a cartesian product into a more parallel, more efficient algorithm for use with MapReduce.


An example R script shows analysis of the similarity metrics. This analysis can be used to decide how to tune the metric thresholds MAX_SIMILARITY and MIN_SIMILARITY, and for QA of the recommender in general.

The R script requires two libraries to be installed via the usual R mechanisms: plyr and ggplot2.

After running the sample app in Apache Hadoop, run the R script to analyze the distribution similarity metrics in its results:

R --vanilla --args output/similarity/part-00000 < src/main/r/metric.R

That will produce two charts in a PDF file Rplots.pdf, which show the distribution of similarity metrics across Twitter user IDs in the input data set.

R analysis


This code was copied from the "logparser" example in cascading.samples: https://github.com/Cascading/cascading.samples/tree/master/logparser