A wrapper for Hadoop in Scala
Scala
Pull request Compare This branch is 112 commits ahead of bsdfish:master.
Latest commit e23471e Sep 16, 2014 @adamretter Merge pull request #11 from rwalpole/master
[feature] moved to Scala version 2.11.2
Permalink
Failed to load latest commit information.
src Added a heterogenous list for Writables Feb 4, 2014
.gitignore
.travis.yml
LICENSE
NOTICES
README.md [refactor] Renamed ScalaHadoop to Shadoop Nov 18, 2013
pom.xml

README.md

Shadoop

A Hadoop DSL and lightweight wrapper for Scala

Build Status

This fork of ScalaHadop is mostly just cherry-picked commits from the forks by @hito-asa, @ivmaykov and @oscarrenalis, of the original work by @bsdfish. In addition there are a few extra features and a cleaned up Maven build.

This code provides some syntactic sugar on top of Hadoop in order to make it more usable from Scala. Take a look at src/main/scala/net/renalias/scoop/examples/WordCount.scala for more details.

License

Apache License, Version 2.0

Usage

Basic Usage

A basic mapper looks like:

val mapper = new Mapper[LongWritable, Text, Text, LongWritable] {
    mapWith {
        (k, v) =>
            (v split " |\t").map(x => (new Text(x), new LongWritable(1L))).toList
    }
}

a reducer looks like this:

val reducer = new Reducer[Text, LongWritable, Text, LongWritable] {
    reduceWith {
        (k, v) =>
            List((k, (0L /: v)((total, next) => total + next)))
    }
}

and, the pipeline to bind them together may look like this:

TextInput[LongWritable, Text]("/tmp/input.txt") -->
MapReduceTask(mapper, reducer, "Word Count")    -->
TextOutput[Text, LongWritable]("/tmp/output")   execute

The key difference here between standard mappers and reducers is that the map and reduce parts are written as side-effect free functions that accept a key and a value, and return an iterable; code behind the scenes will take care of updating Hadoop's Context object.

Some note still remains to be done to polish the current interface, to remove things like .toList from the mapper and the creation of Hadoop's specific Text and LongWritable objects.

Note that implicit conversion is used to convert between LongWritable and longs, as well as Text and Strings. The types of the input and output parameters only need to be stated as the generic specializers of the class it extends.

These mappers and reducers can be chained together with the --> operator:

object WordCount extends ScalaHadoop {
  def run(args: Array[String]) : Int = {
    TextInput[LongWritable, Text](args(0)) -->
    MapReduceTask(mapper, reducer, "Main task") -->
    TextOutput[Text, LongWritable](args(1)) execute

    0 //result code
  }
}

Multiple map/reduce

Multiple map/reduce runs may be chained together:

object WordsWithSameCount extends ScalaHadoop {
  def run(args: Array[String]) : Int = {
    TextInput[LongWritable, Text](args(0)) -->
    MapReduceTask(tokenizerMap1, sumReducer, "Sum") -->
    MapReduceTask(flipKeyValueMap, wordListReducer, "Reduce") -->
    TextOutput[LongWritable, Text](args(1)) execute

    0 //result code
  }
}

Contributors