Skip to content

Commit

Permalink
(#3424) - implement bulkGet()
Browse files Browse the repository at this point in the history
Related to pouchdb/express-pouchdb#196 and
apache/couchdb-chttpd#33

This adds a `bulkGet()` function to all PouchDB
objects, which in the case of local databases runs
a shim that simply collates a bunch of `get()` requests.
This logic is in `bulk-get-shim.js`.

In the case of the http adapter, the client checks if
the server responds to `_bulk_get` with a 40x error. If
no error is returned, then the server is assumed to
implement `_bulk_get` and that API is used. Else the shim
is used.

I have tested this against CouchDB 1.6 and PouchDB Server
(82b297e) with 100% success in both. Also manual testing
showed that the `_bulk_get` API was used for PouchDB Server
but not CouchDB.

Three cheers for bulk replication!
  • Loading branch information
nolanlawson committed May 2, 2015
1 parent ce8b25a commit a0ef9b8
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 27 deletions.
13 changes: 13 additions & 0 deletions lib/adapter.js
Expand Up @@ -6,6 +6,7 @@ var errors = require('./deps/errors');
var EventEmitter = require('events').EventEmitter;
var upsert = require('./deps/upsert');
var Changes = require('./changes');
var bulkGetShim = require('./deps/bulk-get-shim');
var Promise = utils.Promise;

/*
Expand Down Expand Up @@ -323,6 +324,18 @@ AbstractPouchDB.prototype.revsDiff =
}, this);
});

// _bulk_get API for faster replication, as described in
// https://github.com/apache/couchdb-chttpd/pull/33
// At the "abstract" level, it will just run multiple get()s in
// parallel, because this isn't much of a performance cost
// for local databases (except the cost of multiple transactions, which is
// small). The http adapter overrides this in order
// to do a more efficient single HTTP request.
AbstractPouchDB.prototype.bulkGet =
utils.adapterFun('bulkGet', function (opts, callback) {
bulkGetShim(this, opts, callback);
});

// compact one document and fire callback
// by compacting we mean removing all revisions which
// are further from the leaf in revision tree than max_height
Expand Down
65 changes: 65 additions & 0 deletions lib/adapters/http/http.js
@@ -1,6 +1,7 @@
"use strict";

var CHANGES_BATCH_SIZE = 25;
var MAX_SIMULTANEOUS_REVS = 50;

// according to http://stackoverflow.com/a/417184/680742,
// the de factor URL length limit is 2000 characters.
Expand All @@ -14,6 +15,7 @@ var errors = require('../../deps/errors');
var log = require('debug')('pouchdb:http');
var isBrowser = typeof process === 'undefined' || process.browser;
var buffer = require('../../deps/buffer');
var bulkGetShim = require('../../deps/bulk-get-shim');

function encodeDocId(id) {
if (/^_design/.test(id)) {
Expand Down Expand Up @@ -240,6 +242,69 @@ function HttpPouch(opts, callback) {
});
});

api.bulkGet = utils.adapterFun('bulkGet', function (opts, callback) {
var self = this;

function doBulkGet(cb) {
ajax({
headers: host.headers,
url: genDBUrl(host, '_bulk_get'),
method: 'POST',
body: opts
}, cb);
}

function doBulkGetShim() {
if (!opts.docs.length) {
return callback(null, {results: []});
}

// avoid "url too long error" by splitting up into multiple requests
var batchSize = MAX_SIMULTANEOUS_REVS;
var numBatches = Math.ceil(opts.docs.length / batchSize);
var numDone = 0;
var results = new Array(numBatches);

function onResult(batchNum) {
return function (err, res) {
// err is impossible because shim returns a list of errs in that case
results[batchNum] = res.results;
if (++numDone === numBatches) {
callback(null, {results: utils.flatten(results)});
}
};
}

for (var i = 0; i < numBatches; i++) {
var subOpts = utils.pick(opts, ['revs']);
subOpts.docs = opts.docs.slice(i * batchSize,
Math.min(opts.docs.length, (i + 1) * batchSize));
bulkGetShim(self, subOpts, onResult(i));
}
}

if (typeof self._supports_bulk_get !== 'boolean') {
// check if this database supports _bulk_get
doBulkGet(function (err, res) {
if (err) {
if (Math.floor(err.status / 100) === 4) { // 40x
self._supports_bulk_get = false;
doBulkGetShim();
} else {
callback(err);
}
} else {
self._supports_bulk_get = true;
callback(null, res);
}
});
} else if (self._supports_bulk_get) {
doBulkGet(callback);
} else {
doBulkGetShim();
}
});

// Calls GET on the host, which gets back a JSON string containing
// couchdb: A welcome string
// version: The version of CouchDB it is running
Expand Down
76 changes: 76 additions & 0 deletions lib/deps/bulk-get-shim.js
@@ -0,0 +1,76 @@
'use strict';

var utils = require('../utils');

// shim for P/CouchDB adapters that don't directly implement
// _bulk_get
function bulkGet(db, opts, callback) {
var requests = Array.isArray(opts) ? opts : opts.docs;
if (!requests.length) {
return callback(null, {results: []});
}

// consolidate into one request per doc if possible
var requestsById = {};
requests.forEach(function (request) {
if (request.id in requestsById) {
requestsById[request.id].push(request);
} else {
requestsById[request.id] = [request];
}
});

var numDocs = Object.keys(requestsById).length;
var numDone = 0;
var perDocResults = new Array(numDocs);

function collapseResults() {
var results = [];
perDocResults.forEach(function (res) {
res.docs.forEach(function (info) {
results.push({
id: res.id,
docs: [info]
});
});
});
callback(null, {results: results});
}

function checkDone() {
if (++numDone === numDocs) {
collapseResults();
}
}

function gotResult(i, id, docs) {
perDocResults[i] = {id: id, docs: docs};
checkDone();
}

Object.keys(requestsById).forEach(function (docId, i) {

var docRequests = requestsById[docId];

// just use the first request as the "template"
// TODO: yeah... this API allows more subtle use cases than this,
// but for now there's only one real client (replicate.js) since users
// shouldn't need to touch this. so let's optimize for that case.
var docOpts = utils.pick(docRequests[0], ['atts_since', 'attachments']);
docOpts.open_revs = docRequests.map(function (request) {
// rev is required, open_revs disallowed
return request.rev;
});
// globally-supplied options
['revs', 'attachments', 'atts_since'].forEach(function (param) {
if (param in opts) {
docOpts[param] = opts[param];
}
});
db.get(docId, docOpts, function (err, res) {
gotResult(i, docId, err ? [{error: err}] : res);
});
});
}

module.exports = bulkGet;
54 changes: 27 additions & 27 deletions lib/replicate.js
Expand Up @@ -4,7 +4,6 @@ var utils = require('./utils');
var EE = require('events').EventEmitter;
var Checkpointer = require('./checkpointer');

var MAX_SIMULTANEOUS_REVS = 50;
var RETRY_DEFAULT = false;

function randomNumber(min, max) {
Expand Down Expand Up @@ -212,41 +211,42 @@ function replicate(repId, src, target, opts, returnValue, result) {
});
}

function processDiffDoc(id) {
function getAllDocs() {
var diffs = currentBatch.diffs;
var allMissing = diffs[id].missing;
// avoid url too long error by batching
var missingBatches = [];
for (var i = 0; i < allMissing.length; i += MAX_SIMULTANEOUS_REVS) {
missingBatches.push(allMissing.slice(i, Math.min(allMissing.length,
i + MAX_SIMULTANEOUS_REVS)));
}

return utils.Promise.all(missingBatches.map(function (missing) {
var opts = {
revs: true,
open_revs: missing,
attachments: true
};
return src.get(id, opts).then(function (docs) {
docs.forEach(function (doc) {
if (state.cancelled) {
return completeReplication();
}
var requests = [];
Object.keys(diffs).forEach(function (id) {
var missingRevs = diffs[id].missing;
missingRevs.forEach(function (missingRev) {
requests.push({
id: id,
rev: missingRev
});
});
});

var bulkGetOpts = {
docs: requests,
revs: true,
attachments: true
};

return src.bulkGet(bulkGetOpts).then(function (bulkGetResponse) {
if (state.cancelled) {
return completeReplication();
}
bulkGetResponse.results.forEach(function (bulkGetInfo) {
bulkGetInfo.docs.forEach(function (doc) {
if (doc.ok) {
result.docs_read++;
currentBatch.pendingRevs++;
currentBatch.docs.push(doc.ok);
}
// TODO: should we be reporting doc.error?
});
delete diffs[id];
delete diffs[bulkGetInfo.id];
});
}));
}

function getAllDocs() {
var diffKeys = Object.keys(currentBatch.diffs);
return utils.Promise.all(diffKeys.map(processDiffDoc));
});
}


Expand Down
13 changes: 13 additions & 0 deletions lib/utils.js
Expand Up @@ -40,6 +40,19 @@ exports.pick = function (obj, arr) {
return res;
};

// flatten an array of arrays, with optional non-arrays inside
exports.flatten = function (arrays) {
var res = [];
arrays.forEach(function (array) {
if (Array.isArray(array)) {
res = res.concat(array);
} else {
res.push(array);
}
});
return res;
};

exports.inherits = require('inherits');

function isChromeApp() {
Expand Down

0 comments on commit a0ef9b8

Please sign in to comment.