Simple MapReduce implementation, written in JavaScript.
Via npm on Node:
npm install simplemapreduce
Reference in your program:
var simplemapreduce = require('simplemapreduce');
Synchronous run
simplemapreduce.mapReduceSync(items, mapfn, reducefn);
where
items
: to be processed. In the current version, it's an object withforEach
function defined.mapfn(key, value, ctx)
: given a key/value pair, it emits zero, one or more key/value pairs usingctx
. See example below.reducefn(key, values, ctx)
: given a key and its associated values, emits zero, one or more key/value pairs usingctx
.
Word count example
var result = simplemapreduce.mapReduceSync(
["A", "word", "is", "a", "word"], // items to process
function (key, value, ctx) { ctx.emit(value.toLowerCase(), 1); }, // map
function (key, values, ctx) { // reduce
var total = 0;
values.forEach(function (value) {
total += value;
});
ctx.emit(key, total);
}
);
simplemapreduce.mapReduceSync(items, mapfn, reducefn, callbackfn);
where
items
: to be processed. In the current version, it's an object with keys that can be obtained usingObject.keys(items)
.mapfn(key, value, ctx, next)
: given a key/value pair, it emits zero, one or more key/value pairs usingctx
. See example below.reducefn(key, values, ctx, next)
: given a key and its associated values, emits zero, one or more key/value pairs usingctx
.callbackfn(err, results)
:results
is a dictionary with the key/value reduce outcome.
simplemapreduce.mapReduce(
["A", "word", "is", "a", "word"], // items to process
function (key, value, ctx, next) { ctx.emit(value.toLowerCase(), 1); next(); }, // map
function (key, values, ctx, next) { // reduce
var total = 0;
values.forEach(function (value) {
total += value;
});
ctx.emit(key, total);
next();
},
function (err, result) {
// result.a === 2
// result.word === 2
// result.is === 1
}
);
The above example is simple: it is executed without doing async calls in the map/reduce functions. But you can call next callback at any time.
Synchronous run
simplemapreduce.runSync(items, mapfn, newfn, processfn);
where
items
: to be processed. In the current version, it's an object withforEach
function defined.mapfn(key, value, ctx)
: given a key/value pair, it emits zero, one or more key/value pairs usingctx
.newfn(key)
: given a new key, it returns the new object to be associated with that key.processfn(key, value, result)
: process an item, usually modifying its associated result object. In addition, it could receive and use the associated key and themap
, the dictionary that is being build by the process.
Example
var result = simplemapreduce.runSync(
["A", "word", "is", "a", "word"],
function (key, value, ctx) { ctx.emit(value.toLowerCase(), 1); },
function (key) { return { word: key, count: 0 }; },
function (key, value, result) { result.count += value; }
);
console.dir(result);
Output
{ a: { count: 2 }, word: { count: 2 }, is: { count: 1 } }
There is a run with callback:
simplemapreduce.run(items, mapfn, newfn, processfn);
Example, see the use of next
, received as last parameter by mapfn
and processfn
:
simplemapreduce.run(
["A", "word", "is", "a", "word"],
function (key, value, context, next) { context.emit(value.toLowerCase(), 1); next(); },
function (key) { return { word: key, count: 0 }; },
function (key, value, result, next) { result.count += value; next(); },
function (err, result) {
if (err)
console.log(err);
else
console.dir(result);
}
);
Alternatively, you can define a task, an object with functions:
getItems()
: return the items to be processed.mapfn(key, value, ctx, next)
: given a key/value pair, it emits zero, one or more key/value pairs usingctx
.newResult(key)
: creates a new object/value to be associated to the key/item. Usually it's used to accumulate results.process(key, value, result)
: function that process a key/value pair, usually updating the associated result object.
Example:
var task = {
items: ["A", "word", "is", "a", "word"],
getItems: function () { return this.items; },
map: function (key, value, context) { context.emit(value.toLowerCase(), 1); },
newResult: function (key) { return { count: 0 }; },
process: function (key, value, result) { result.count += value; }
};
simplemapreduce.runTask(task, function (err, result) { console.dir(result); });
Notice that in this case, getItems
returns items defined in the same task. You can provide a more complex function, i.e.
reading an stream or file.
git clone git://github.com/ajlopez/SimpleMapReduce.git
cd SimpleMapReduce
npm install
npm test
Words Word Count sample with callback.
Words Sync Synchronous Word Count sample.
Task Run Task sample with callback.
Task Sync Synchrnous Run Task.
Distributed Workers A server sending tasks to distributed workers.
- Improve async procesing
- Distributed sample
- 0.0.1 : Published, run, runSync
- 0.0.2 : Published, runTask, runTaskSync, first samples
- 0.0.3 : In master, work in progress. Distributed sample.
Feel free to file issues and submit pull requests — contributions are welcome.
If you submit a pull request, please be sure to add or update corresponding
test cases, and ensure that npm test
continues to pass.