Skip to content

Commit

Permalink
feat: add subkey and dedupe
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben Shepheard committed May 22, 2014
1 parent 3f67b9a commit 1f0e6a1
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 43 deletions.
20 changes: 11 additions & 9 deletions README.md
Expand Up @@ -2,7 +2,7 @@

[![Build Status](https://travis-ci.org/coinative/level-merged-stream.svg?branch=master)](https://travis-ci.org/coinative/level-merged-stream) [![Coverage Status](https://img.shields.io/coveralls/coinative/level-merged-stream.svg)](https://coveralls.io/r/coinative/level-merged-stream?branch=master)

A [LevelUP](https://github.com/rvagg/node-levelup) plugin to merge multiple ranges into a single sorted stream. Useful when doing range queries against multiple key-based secondary indexes. Supports standard LevelUP stream options and naïve pagination via a `skip` option.
A [LevelUP](https://github.com/rvagg/node-levelup) plugin to merge multiple ranges into a single sorted stream using a subkey for ordering. Useful when doing range queries against multiple key-based secondary indexes. Supports standard LevelUP stream options and naïve pagination via a `skip` option.

## Install

Expand Down Expand Up @@ -34,11 +34,15 @@ db = mergedStream(db);

As per [`db.createReadStream()`](https://github.com/rvagg/node-levelup/blob/master/README.md#createReadStream) but with the following additional options:

* `'ranges'`: an array of `start`/`end` pairs as you'd use in `createReadStream`. Each range is streamed and merged in the order defined by `comparator`.
* `'ranges'` *(array)*: The `start`/`end` ranges as you'd provide to `createReadStream`. Each range is streamed and merged according to the value privided by `subkey`. Each range must be ordered by `subkey`, i.e. any range prefix must be constant for the entire range for overall ordering to be consistent.

* `'comparator'`: a key comparator function. This defines the sort order of the resulting merged stream. If not specified results are sorted via a basic comparator function (which significantly limits it's usefulness).
* `'subkey'` *(function, default: identity)*): Selects the subkey from the key that is used for ordering the results.

* `'skip'` *(number, default: `0`)*: the number of results to skip in the merged stream. Useful for pagination when also using `limit`.
* `'comparator'` *(function, default: primitive comparison)*: A subkey comparator function if primitive comparison is insufficient, .e.g. for Buffers or custom keys.

* `'dedupe'` *(boolean, default: `false`)*: If `true`, when two or more subkeys are considered equal, only the first result will be streamed. The key and/or value returned could come from any underlying range stream.

* `'skip'` *(number, default: `0`)*: The number of results to skip in the merged stream. Useful for pagination when also using `limit`.

--------------------------------------------------------
<a name="createMergedKeyStream"></a>
Expand All @@ -54,7 +58,7 @@ As per [`db.createValueStream()`](https://github.com/rvagg/node-levelup/blob/mas

--------------------------------------------------------

## Examaple
## Example

`examples/readme.js`:
```js
Expand Down Expand Up @@ -83,10 +87,8 @@ db.batch()
{ start: 'c', end: 'd' }
],
// Ignore the first character for sorting
comparator: function (x, y) {
x = x.slice(1);
y = y.slice(1);
return x > y ? 1 : x < y ? -1 : 0;
subkey: function (key) {
return key.slice(1);
},
skip: 1,
limit: 2
Expand Down
6 changes: 2 additions & 4 deletions examples/readme.js
Expand Up @@ -23,10 +23,8 @@ db.batch()
{ start: 'c', end: 'd' }
],
// Ignore the first character for sorting
comparator: function (x, y) {
x = x.slice(1);
y = y.slice(1);
return x > y ? 1 : x < y ? -1 : 0;
subkey: function (key) {
return key.slice(1);
},
skip: 1,
limit: 2
Expand Down
44 changes: 38 additions & 6 deletions lib/index.js
@@ -1,25 +1,49 @@
'use strict';

var through = require('through');
var through2 = require('through2');
var merge = require('mergesort-stream');
var offset = require('offset-stream');
var limit = require('limit-stream');
var extend = require('xtend');

function defaultSubkey(key) {
return key;
}

function defaultComparator(x, y) {
return x > y ? 1 : x < y ? -1 : 0;
}

function makeComparator(options) {
var subkey = options.subkey || defaultSubkey;
var comparator = options.comparator || defaultComparator;
if (options.values) {
return function (x, y) {
return comparator(x.key, y.key);
x = subkey(x.key);
y = subkey(y.key);
return comparator(x, y);
};
}
return comparator;
return function (x, y) {
x = subkey(x);
y = subkey(y);
return comparator(x, y);
};
}

// Our stream is ordered so we only need to check the previous result to
// detect duplicates
function makeDedupe(comparator) {
var prev;
return through2.obj(function (data, enc, callback) {
if (!prev || comparator(prev, data) !== 0) {
this.push(data);
}
prev = data;
callback();
});
};

function createMergedReadStream(db, options) {
options = extend({ keys: true, values: true }, options);

Expand All @@ -28,23 +52,31 @@ function createMergedReadStream(db, options) {
if (options.limit && options.skip) {
overrides.limit = options.limit + options.skip;
}
var comparator = makeComparator(options);

if (options.ranges) {
// Force keys for comparator
overrides.keys = true;
sources = options.ranges.map(function (range) {
return db.createReadStream(extend(options, range, overrides));
});
stream = merge(makeComparator(options), sources);
stream = merge(comparator, sources);
if (options.dedupe) {
stream = stream.pipe(makeDedupe(comparator));
}
// If we forced keys for the comparator, drop them now
if (!options.keys && options.values) {
stream = stream.pipe(through(function (data) {
this.queue(data.value);
stream = stream.pipe(through2.obj(function (data, enc, callback) {
this.push(data.value);
callback();
}));
}
} else {
stream = db.createReadStream(extend(options, overrides));
sources = [stream];
if (options.dedupe) {
stream = stream.pipe(makeDedupe(comparator));
}
}
if (options.skip) {
stream = stream.pipe(offset(options.skip));
Expand Down
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -28,7 +28,7 @@
"limit-stream": "^2.0.0",
"mergesort-stream": "0.0.3",
"offset-stream": "0.0.0",
"through": "^2.3.4",
"through2": "^0.4.2",
"xtend": "^3.0.0"
}
}
100 changes: 77 additions & 23 deletions test/test.js
Expand Up @@ -36,19 +36,17 @@ describe('level-merged-stream', function () {
db.close(done);
});

// Sort from the second character
function comparator(x, y) {
x = x.slice(1);
y = y.slice(1);
return x > y ? 1 : x < y ? -1 : 0;
// Ignore the first character for sorting
function subkey(key) {
return key.slice(1);
}

describe('createMergedRangeStream', function () {
it('should \'merge\' 1 stream', function (done) {
var results = [];
db.mergedReadStream({
ranges: [ { start: 'c', end: 'd' }],
comparator: comparator
subkey: subkey
})
.on('data', function (data) {
results.push(data.value);
Expand All @@ -63,7 +61,7 @@ describe('level-merged-stream', function () {
var results = [];
db.mergedReadStream({
ranges: [ { start: 'a', end: 'b' }, { start: 'c', end: 'd' }],
comparator: comparator
subkey: subkey
})
.on('data', function (data) {
results.push(data.value);
Expand All @@ -78,7 +76,7 @@ describe('level-merged-stream', function () {
var results = [];
db.mergedReadStream({
ranges: ranges,
comparator: comparator
subkey: subkey
})
.on('data', function (data) {
results.push(data.value);
Expand All @@ -93,7 +91,7 @@ describe('level-merged-stream', function () {
var results = [];
var stream = db.mergedReadStream({
ranges: ranges,
comparator: comparator
subkey: subkey
});
stream
.on('data', function (data) {
Expand All @@ -112,7 +110,7 @@ describe('level-merged-stream', function () {
var results = [];
var stream = db.mergedReadStream({
ranges: ranges,
comparator: comparator,
subkey: subkey,
limit: 4
});
stream
Expand All @@ -129,7 +127,7 @@ describe('level-merged-stream', function () {
var results = [];
var stream = db.mergedReadStream({
ranges: ranges,
comparator: comparator,
subkey: subkey,
skip: 3,
limit: 3
});
Expand Down Expand Up @@ -167,8 +165,8 @@ describe('level-merged-stream', function () {
var results = [];
var stream = db.mergedReadStream({
ranges: [{ start: { prefix: 'a' }, end: { prefix: 'a', value: '4' } }, { start: { prefix: 'c' } }],
comparator: function (x, y) {
return x.value > y.value ? 1 : x.value < y.value ? -1 : 0;
subkey: function (key) {
return key.value;
},
skip: 1,
limit: 3,
Expand Down Expand Up @@ -214,27 +212,27 @@ describe('level-merged-stream', function () {
});
});

it('should return comparator duplicates (unordered)', function (done) {
it('should return subkey duplicates (unordered)', function (done) {
db.put('b0', '9', function () {
var results = [];
db.mergedReadStream({
ranges: ranges,
comparator: comparator,
subkey: subkey,
limit: 4
})
.on('data', function (data) {
results.push(data.value);
results.push(data.key);
})
.on('end', function () {
// Since both a0 and b0 will be equal, we aren't guaranteed of their
// ordering, so only assert on their presence
expect(results.sort()).to.deep.equal(['0', '1', '2', '9']);
expect(results.sort()).to.deep.equal(['a0', 'b0', 'b2', 'c1']);
done();
});
});
});

it('should return range duplicates (unordered) for default comparator', function (done) {
it('should return range duplicates (unordered) for default subkey', function (done) {
db.put('b0', '9', function () {
var results = [];
db.mergedReadStream({
Expand All @@ -250,32 +248,88 @@ describe('level-merged-stream', function () {
});
});

it('should not return range duplicates when dedupe specified', function (done) {
db.put('b0', '9', function () {
var results = [];
db.mergedReadStream({
ranges: [{ start: 'a', end: 'b' }, { start: 'a', end: 'b' }],
dedupe: true
})
.on('data', function (data) {
results.push(data.key);
})
.on('end', function () {
expect(results).to.deep.equal(['a0', 'a4', 'a5']);
done();
});
});
});

it('should not return duplicates when dedupe specified for single stream', function (done) {
db.put('d6', '6', function () {
var results = [];
db.mergedReadStream({
start: 'c6',
subkey: subkey,
dedupe: true
})
.on('data', function (data) {
results.push(data.key);
})
.on('end', function () {
// We should be missing 'd6' as it's subkey is equal to 'c6'
expect(results).to.deep.equal(['c6']);
done();
});
});
});

it('should provide enough source stream results to meet skip & limit', function (done) {
var results = [];
db.mergedReadStream({
ranges: ranges,
comparator: comparator,
subkey: subkey,
skip: 3,
limit: 1
})
.on('data', function (data) {
results.push(data.value);
})
.on('end', function () {
// If createReadStream limit was set to 1 and not 3 + 1, we'd get []
// and not ['3'] because the 'b' stream would already have ended
// If createReadStream limit was set to 1 the 'b' stream would end
// before we received 'b3'
expect(results).to.deep.equal(['3']);
done();
});
});

it('should allow custom comparator', function (done) {
var results = [];
db.mergedReadStream({
ranges: ranges,
subkey: function (key) {
return [key[0], parseInt(key.slice(1))];
},
comparator: function (x, y) {
return x[1] > y[1] ? 1 : x[1] < y[1] ? -1 : 0;
}
})
.on('data', function (data) {
results.push(data.value);
})
.on('end', function () {
expect(results).to.deep.equal(['0', '1', '2', '3', '4', '5', '6']);
done();
});
});
});

describe('createMergedKeyStream', function () {
it('should emit only keys', function (done) {
var results = [];
db.mergedKeyStream({
ranges: ranges,
comparator: comparator
subkey: subkey
})
.on('data', function (data) {
results.push(data);
Expand All @@ -292,7 +346,7 @@ describe('level-merged-stream', function () {
var results = [];
db.mergedValueStream({
ranges: ranges,
comparator: comparator
subkey: subkey
})
.on('data', function (data) {
results.push(data);
Expand Down

0 comments on commit 1f0e6a1

Please sign in to comment.