Skip to content

Commit

Permalink
Add bulk() iteration.
Browse files Browse the repository at this point in the history
  • Loading branch information
satazor committed Mar 26, 2016
1 parent a43f3b2 commit 1006a69
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 9 deletions.
58 changes: 55 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ A fast and easy to ease CouchDB iterator for views and all documents.

## Usage

`.couchdbIterator(couchdbAddr, [view], iterator, [options])`
### .couchdbIterator(couchdbAddr, [view], iterator, [options])

Calls `iterator` for all rows of the database referenced by `couchdbAddr`.
Calls `iterator` for each row of the database referenced by `couchdbAddr`.
If a `view` is supplied, iterates only over that view's rows.

This library aims to be fast, therefore iteration happens concurrently. The iterator function can be async but beware that order
Expand All @@ -38,7 +38,7 @@ Examples:
const couchdbIterator = require('couchdb-iterator');

// Iterate over all rows of a database
couchdbIterator('http://localhost:5984/my-db', 'my-design-doc/my-view', (row, index) => {
couchdbIterator('http://localhost:5984/my-db', (row, index) => {
console.log(index, row.id, row.key, row.value);
// Do something with `row`; you may return a promise here
})
Expand Down Expand Up @@ -71,11 +71,63 @@ Available options:
All querying options have no default value, except for `limit` which is `500`. Also, `stale` is automatically set to `ok` after the first iteration to further improve performance.


### .couchdbIterator.bulk(couchdbAddr, [view], iterator, [options])

Calls `iterator` for a bulk of rows of the database referenced by `couchdbAddr`.
If a `view` is supplied, iterates only over that view's rows.

This method is similar to `couchdbIterator()` but iterates in bulks and it respects the order of rows. The order is respected because since a bulk is ordered and the next bulk only comes when the current bulk is handled.

Examples:

```js
const couchdbIterator = require('couchdb-iterator');

// Iterate over all rows of a database
couchdbIterator.bulk('http://localhost:5984/my-db', (rows) => {
rows.forEach((row) => {
console.log(row.index, row.id, row.key, row.value);
});
// Do something with `rows`; you may return a promise here
})
.then((rowsCount) => {
console.log(`Iteration completed! ${rowsCount}`);
}, (err) => {
console.log('Iteration failed', err);
});

// Iterate over all rows of a view
couchdbIterator.bulk('http://localhost:5984/my-db', 'my-design-doc/my-view', (rows) => {
rows.forEach((row) => {
console.log(row.index, row.id, row.key, row.value);
});
// Do something with `rows`; you may return a promise here
})
.then((rowsCount) => {
console.log(`Iteration completed! ${rowsCount}`);
}, (err) => {
console.log('Iteration failed', err);
});
```

The `couchdbAddr` argument must be a connection string with protocol, host, port and database path (e.g.: http://localhost:5984/my-db) or a [nano](https://www.npmjs.com/package/nano) instance. The `view` argument is a string in the form of `design-doc/view-name` (e.g.: app/byUser).

Available options:

- `bulkSize`: The bulkSize, defaults to `50`.
- `nano`: Custom options to be used when creating the [nano]((https://www.npmjs.com/package/nano)) instance, defaults to `null`.
- The following [querying options](https://wiki.apache.org/couchdb/HTTP_view_API) are available: `limit`, `skip`, `stale`, `descending`, `startkey`, `startkey_docid`, `endkey`, `endkey_docid`, `include_docs` and `inclusive_end` (can be camelCased).

All querying options have no default value, except for `limit` which is `500`. Also, `stale` is automatically set to `ok` after the first iteration to further improve performance.


## Tests

`$ npm test`
`$ npm test-cov` to get coverage report

The tests expect a running CouchDB in `http://localhost:5984` but you may specify a different address with `COUCHDB`, e.g.: `$ COUCHDB=http://admin:admin@localhost:5984 npm test`;


## License

Expand Down
35 changes: 34 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const snakeCase = require('lodash.snakecase');
const mapKeys = require('lodash.mapkeys');
const rowsReader = require('./lib/rowsReader');
const iteratorCaller = require('./lib/iteratorCaller');
const iteratorCallerBulk = require('./lib/iteratorCallerBulk');

const allowedQueryOptions = [
'limit', 'skip', 'stale', 'descending', 'startkey', 'startkey_docid',
Expand Down Expand Up @@ -58,7 +59,7 @@ function getQueryFn(couchdb, view) {
}

function getQueryOptions(options) {
const queryOptions = mapKeys(omit(options, ['nano', 'concurrency']), (value, key) => snakeCase(key));
const queryOptions = mapKeys(omit(options, ['nano', 'concurrency', 'bulkSize']), (value, key) => snakeCase(key));
const invalidQueryOption = find(Object.keys(queryOptions), (queryOption) => allowedQueryOptions.indexOf(queryOption) === -1);

if (invalidQueryOption) {
Expand All @@ -78,6 +79,7 @@ function couchdbIterator(couchdbAddr, view, iterator, options) {
}

options = Object.assign({ limit: 500, concurrency: 50 }, options);

return new Promise((resolve, reject) => {
const couchdb = getCouchDb(couchdbAddr, options);
const queryFn = getQueryFn(couchdb, view);
Expand All @@ -99,4 +101,35 @@ function couchdbIterator(couchdbAddr, view, iterator, options) {
});
}

function couchdbIteratorBulk(couchdbAddr, view, iterator, options) {
if (typeof view === 'function') {
options = iterator;
iterator = view;
view = null;
}

options = Object.assign({ limit: 500, bulkSize: 50 }, options);

return new Promise((resolve, reject) => {
const couchdb = getCouchDb(couchdbAddr, options);
const queryFn = getQueryFn(couchdb, view);
const queryOptions = getQueryOptions(options);

// Start the iteration!
const rowsReaderStream = rowsReader(queryFn, queryOptions);
const iteratorCallerStream = iteratorCallerBulk(iterator, options.bulkSize);

rowsReaderStream
.on('error', reject)
.pipe(iteratorCallerStream)
.on('error', reject)
.on('end', () => resolve(iteratorCallerStream.getCount()));

iteratorCallerStream.on('readable', () => {
while (iteratorCallerStream.read() !== null) { /* do nothing */ }
});
});
}

module.exports = couchdbIterator;
module.exports.bulk = couchdbIteratorBulk;
6 changes: 2 additions & 4 deletions lib/iteratorCaller.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ class IteratorCallerStream extends SwiftTransformStream {
_transform(data, encoding, callback) {
// Execute the iterator
// Note that the iterator can throw synchronously as well as return non-promise values
return new Promise((resolve, reject) => {
Promise.resolve(this._iterator(data, this._count++)) // eslint-disable-line no-plusplus
.then(resolve, reject);
})
return Promise.resolve()
.then(() => this._iterator(data, this._count++)) // eslint-disable-line no-plusplus
.then(() => callback(null, data), (err) => callback(err))
// Equivalent to .done()
.catch((err) => {
Expand Down
65 changes: 65 additions & 0 deletions lib/iteratorCallerBulk.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
'use strict';

const TransformStream = require('stream').Transform;

class IteratorCallerBulkStream extends TransformStream {
constructor(iterator, bulkSize) {
super({ objectMode: true });

this._iterator = iterator;
this._bulkSize = bulkSize;
this._buffer = [];
this._count = 0;
}

getCount() {
return this._count;
}

// -------------------------------------------------

_transform(data, encoding, callback) {
data.index = this._count++; // eslint-disable-line no-plusplus
this._buffer.push(data);

if (this._buffer.length < this._bulkSize) {
return callback(null, data);
}

this._flushBuffer()
.then(() => callback(null, data), (err) => callback(err))
// Equivalent to .done()
.catch((err) => {
/* istanbul ignore next */
setImmediate(() => { throw err; });
});
}

_flush(callback) {
this._flushBuffer()
.then(() => callback(), (err) => callback(err))
// Equivalent to .done()
.catch((err) => {
/* istanbul ignore next */
setImmediate(() => { throw err; });
});
}

_flushBuffer() {
const bufferLength = this._buffer.length;

if (!bufferLength) {
return Promise.resolve();
}

// Execute the iterator
// Note that the iterator can throw synchronously as well as return non-promise values
return Promise.resolve()
.then(() => this._iterator(this._buffer))
.then(() => { this._buffer = []; });
}
}

module.exports = function (iterator, bulkSize) {
return new IteratorCallerBulkStream(iterator, bulkSize);
};
152 changes: 151 additions & 1 deletion test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const prepare = require('./util/prepare');
// global.Promise = Promise;
// Promise.config({ longStackTraces: true });

describe('couchdb-iterator', () => {
describe('couchdbIterator()', () => {
let prepared;

before(() => {
Expand Down Expand Up @@ -153,3 +153,153 @@ describe('couchdb-iterator', () => {
});
});
});

describe('couchdbIterator.bulk()', () => {
let prepared;

before(() => {
return prepare()
.then((prepared_) => { prepared = prepared_; });
});

after(() => prepared.destroy());

it('should iterate all rows, respecting the options.bulkSize', () => {
let count = 0;

return couchdbIterator.bulk(prepared.couchdbAddr, (rows) => {
count < prepared.documents.length - 5 && expect(rows).to.have.length(5);
count += rows.length;

rows.forEach((row) => {
expect(row).to.have.all.keys('index', 'id', 'key', 'value');
expect(row.id).to.be.a('string');
expect(row.key).to.be.a('string');
expect(row.value).to.be.an('object');
expect(row.key).to.equal(row.id);
});
}, { bulkSize: 5 })
.then((rowsCount) => {
expect(count).to.equal(prepared.documents.length);
expect(rowsCount).to.equal(count);
});
});

it('should iterate all rows from a view, respecting the options.bulkSize', () => {
let count = 0;

return couchdbIterator.bulk(prepared.couchdbAddr, 'test/view-1', (rows) => {
count < prepared.documents.length - 5 && expect(rows).to.have.length(5);
count += rows.length;

rows.forEach((row) => {
expect(row).to.have.all.keys('index', 'id', 'key', 'value');
expect(row.id).to.be.a('string');
expect(row.key).to.be.a('array');
expect(row.value).to.be.an('object');
expect(row.key).to.eql([row.value.$type, row.id]);
});
}, { bulkSize: 5 })
.then((rowsCount) => {
expect(count).to.equal(prepared.documents.length - 1); // Design doc doesn't count
expect(rowsCount).to.equal(count);
});
});

it('should wait for iterator to fulfill', () => {
let count = 0;
let waited = 0;
const userDocuments = prepared.documents.filter((doc) => doc.$type === 'user');

return couchdbIterator.bulk(prepared.couchdbAddr, 'test/view-1', () => {
count += 1;

return Promise.delay(50)
.then(() => { waited += 1; });
}, {
startkey: ['user', ''],
endkey: ['user', '\ufff0'],
bulkSize: 1,
})
.then((rowsCount) => {
expect(waited).to.equal(userDocuments.length);
expect(count).to.equal(userDocuments.length);
expect(rowsCount).to.equal(count);
});
});

it('should fail if the iterator fails', () => {
return couchdbIterator.bulk(prepared.couchdbAddr, () => {
throw new Error('foo');
})
.then(() => {
throw new Error('Expected to fail');
}, (err) => {
expect(err).to.be.an.instanceOf(Error);
expect(err.message).to.equal('foo');
});
});

describe('arguments', () => {
it('should fail if couchdb does not point to a db', () => {
return couchdbIterator.bulk('http://localhost:5984', () => {})
.then(() => {
throw new Error('Expected to fail');
}, (err) => {
expect(err).to.be.an.instanceOf(Error);
expect(err.message).to.match(/no database is selected/i);
});
});

it('should accept a nano instance', () => {
return couchdbIterator.bulk(prepared.couchdb, () => {})
.then((rowsCount) => {
expect(rowsCount).to.equal(prepared.documents.length);
});
});

it('should use options as view params', () => {
let count = 0;
const userDocuments = prepared.documents.filter((doc) => doc.$type === 'user');

return couchdbIterator.bulk(prepared.couchdbAddr, 'test/view-1', (rows) => {
count += rows.length;
}, {
startkey: ['user', ''],
endkey: ['user', '\ufff0'],
})
.then((rowsCount) => {
expect(count).to.equal(userDocuments.length);
expect(rowsCount).to.equal(count);
});
});

it('should snakeCase view params', () => {
let count = 0;
const userDocuments = prepared.documents.filter((doc) => doc.$type === 'user');

return couchdbIterator.bulk(prepared.couchdbAddr, 'test/view-1', (rows) => {
count += rows.length;
rows.forEach((row) => expect(row.doc).to.be.an('object'));
}, {
startkey: ['user', ''],
endkey: ['user', '\ufff0'],
includeDocs: true,
})
.then((rowsCount) => {
expect(count).to.equal(userDocuments.length);
expect(rowsCount).to.equal(count);
});
});

it('should fail on invalid options', () => {
return couchdbIterator.bulk(prepared.couchdbAddr, () => {}, { some: 'option' })
.then(() => {
throw new Error('Expected to fail');
}, (err) => {
expect(err).to.be.an.instanceOf(Error);
expect(err.message).to.match(/option `some` is not allowed/i);
});
});
});
});

0 comments on commit 1006a69

Please sign in to comment.