diff --git a/README.md b/README.md index b5fd8fc9..043aab21 100644 --- a/README.md +++ b/README.md @@ -357,7 +357,7 @@ nano.db.changes('alice', function(err, body) { ### nano.db.follow(name, [params], [callback]) -Uses [Follow] to create a solid changes feed. Please consult `follow` documentation for more information as this is a very complete API on it's own: +Creates a solid changes feed. Please consult [follow documentation](follow.md) for more information as this is a very complete API on it's own: ``` js var feed = db.follow({since: "now"}); @@ -449,9 +449,9 @@ Listen to db updates, the available `params` are: ** changed in version 6 ** -Use [Follow](https://github.com/jhs/follow) to create a solid +Creates a solid [`_db_updates`](http://docs.couchdb.org/en/latest/api/server/common.html?highlight=db_updates#get--_db_updates) feed. -Please consult follow documentation for more information as this is a very complete api on it's own +Please consult [follow](follow.md) documentation for more information as this is a very complete api on it's own ```js var feed = nano.followUpdates({since: "now"}); @@ -1003,7 +1003,6 @@ where `list_doc_params` is the test name. [2]: http://github.com/apache/couchdb-nano/issues [4]: https://github.com/apache/couchdb-nano/blob/master/cfg/couch.example.js [8]: http://webchat.freenode.net?channels=%23couchdb-dev -[follow]: https://github.com/jhs/follow [request]: https://github.com/request/request ## License diff --git a/follow.md b/follow.md new file mode 100644 index 00000000..e553dde8 --- /dev/null +++ b/follow.md @@ -0,0 +1,210 @@ +# Follow: CouchDB changes and db updates notifier for NodeJS + +[![Build Status](https://travis-ci.org/cloudant-labs/cloudant-follow.svg?branch=master)](https://travis-ci.org/cloudant-labs/cloudant-follow) + +--- + +This is a fork of the Iris Couch [follow](https://github.com/iriscouch/follow) project. + +This fork is maintained by IBM Cloudant in order to provide fixes for the [nodejs-cloudant](https://github.com/cloudant/nodejs-cloudant) library. + +--- + +## Example + +This looks much like the [request][req] API. + +```javascript +var follow = require('cloudant-follow'); +follow("https://example.iriscouch.com/boogie", function(error, change) { + if(!error) { + console.log("Got change number " + change.seq + ": " + change.id); + } +}) +``` + +The *error* parameter to the callback will basically always be `null`. + +## Objective + +The API must be very simple: notify me every time a change happens in the DB. Also, never fail. + +If an error occurs, Follow will internally retry without notifying your code. + +Specifically, this should be possible: + +1. Begin a changes feed. Get a couple of change callbacks +2. Shut down CouchDB +3. Go home. Have a nice weekend. Come back on Monday. +4. Start CouchDB with a different IP address +5. Make a couple of changes +6. Update DNS so the domain points to the new IP +7. Once DNS propagates, get a couple more change callbacks + +## Failure Mode + +If CouchDB permanently crashes, there is an option of failure modes: + +* **Default:** Simply never call back with a change again +* **Optional:** Specify an *inactivity* timeout. If no changes happen by the timeout, Follow will signal an error. + +## DB Updates + +If the db url ends with `/_db_updates`, Follow will provide a +[_db_updates](http://docs.couchdb.org/en/latest/api/server/common.html?highlight=db_updates#get--_db_updates) feed. + +For each change, Follow will emit a `change` event containing: + +* `type`: `created`, `updated` or `deleted`. +* `db_name`: Name of the database where the change occoured. +* `ok`: Event operation status (boolean). + +Note that this feature is available as of CouchDB 1.4. + +### Simple API: follow(options, callback) + +The first argument is an options object. The only required option is `db`. Instead of an object, you can use a string to indicate the `db` value. + +```javascript +follow({db:"https://example.iriscouch.com/boogie", include_docs:true}, function(error, change) { + if(!error) { + console.log("Change " + change.seq + " has " + Object.keys(change.doc).length + " fields"); + } +}) +``` + + +All of the CouchDB _changes options are allowed. See http://guide.couchdb.org/draft/notifications.html. + +* `db` | Fully-qualified URL of a couch database. (Basic auth URLs are ok.) +* `since` | The sequence number to start from. Use `"now"` to start from the latest change in the DB. +* `heartbeat` | Milliseconds within which CouchDB must respond (default: **30000** or 30 seconds) +* `feed` | **Optional but only "continuous" is allowed** +* `filter` | + * **Either** a path to design document filter, e.g. `app/important` + * **Or** a Javascript `function(doc, req) { ... }` which should return true or false +* `query_params` | **Optional** for use in with `filter` functions, passed as `req.query` to the filter function + +Besides the CouchDB options, more are available: + +* `headers` | Object with HTTP headers to add to the request +* `inactivity_ms` | Maximum time to wait between **changes**. Omitting this means no maximum. +* `max_retry_seconds` | Maximum time to wait between retries (default: 360 seconds) +* `initial_retry_delay` | Time to wait before the first retry, in milliseconds (default 1000 milliseconds) +* `response_grace_time` | Extra time to wait before timing out, in milliseconds (default 5000 milliseconds) + +## Object API + +The main API is a thin wrapper around the EventEmitter API. + +```javascript +var follow = require('cloudant-follow'); + +var opts = {}; // Same options paramters as before +var feed = new follow.Feed(opts); + +// You can also set values directly. +feed.db = "http://example.iriscouch.com/boogie"; +feed.since = 3; +feed.heartbeat = 30 * 1000 +feed.inactivity_ms = 86400 * 1000; + +feed.filter = function(doc, req) { + // req.query is the parameters from the _changes request and also feed.query_params. + console.log('Filtering for query: ' + JSON.stringify(req.query)); + + if(doc.stinky || doc.ugly) + return false; + return true; +} + +feed.on('change', function(change) { + console.log('Doc ' + change.id + ' in change ' + change.seq + ' is neither stinky nor ugly.'); +}) + +feed.on('error', function(er) { + console.error('Since Follow always retries on errors, this must be serious'); + throw er; +}) + +feed.follow(); +``` + + +## Pause and Resume + +A Follow feed is a Node.js stream. If you get lots of changes and processing them takes a while, use `.pause()` and `.resume()` as needed. Pausing guarantees that no new events will fire. Resuming guarantees you'll pick up where you left off. + +```javascript +follow("https://example.iriscouch.com/boogie", function(error, change) { + var feed = this + + if(change.seq == 1) { + console.log('Uh oh. The first change takes 30 hours to process. Better pause.') + feed.pause() + setTimeout(function() { feed.resume() }, 30 * 60 * 60 * 1000) + } + + // ... 30 hours with no events ... + + else + console.log('No need to pause for normal change: ' + change.id) +}) +``` + + +## Events + +The feed object is an EventEmitter. There are a few ways to get a feed object: + +* Use the object API above +* Use the return value of `follow()` +* In the callback to `follow()`, the *this* variable is bound to the feed object. + +Once you've got one, you can subscribe to these events: + +* **start** | Before any i/o occurs +* **confirm_request** | `function(req)` | The database confirmation request is sent; passed the `request` object +* **confirm** | `function(db_obj)` | The database is confirmed; passed the couch database object +* **change** | `function(change)` | A change occured; passed the change object from CouchDB +* **catchup** | `function(seq_id)` | The feed has caught up to the *update_seq* from the confirm step. Assuming no subsequent changes, you have seen all the data. +* **wait** | Follow is idle, waiting for the next data chunk from CouchDB +* **timeout** | `function(info)` | Follow did not receive a heartbeat from couch in time. The passed object has `.elapsed_ms` set to the elapsed time +* **retry** | `function(info)` | A retry is scheduled (usually after a timeout or disconnection). The passed object has + * `.since` the current sequence id + * `.after` the milliseconds to wait before the request occurs (on an exponential fallback schedule) + * `.db` the database url (scrubbed of basic auth credentials) +* **stop** | The feed is stopping, because of an error, or because you called `feed.stop()` +* **error** | `function(err)` | An error occurs + +## Error conditions + +Follow is happy to retry over and over, for all eternity. It will only emit an error if it thinks your whole application might be in trouble. + +* *DB confirmation* failed: Follow confirms the DB with a preliminary query, which must reply properly. +* *DB is deleted*: Even if it retried, subsequent sequence numbers would be meaningless to your code. +* *Your inactivity timer* expired: This is a last-ditch way to detect possible errors. What if couch is sending heartbeats just fine, but nothing has changed for 24 hours? You know that for your app, 24 hours with no change is impossible. Maybe your filter has a bug? Maybe you queried the wrong DB? Whatever the reason, Follow will emit an error. +* JSON parse error, which should be impossible from CouchDB +* Invalid change object format, which should be impossible from CouchDB +* Internal error, if the internal state seems wrong, e.g. cancelling a timeout that already expired, etc. Follow tries to fail early. + +## Tests + +Follow uses [node-tap][tap]. If you clone this Git repository, tap is included. + + $ ./node_modules/.bin/tap test/*.js test/issues/*.js + ok test/couch.js ...................................... 11/11 + ok test/follow.js ..................................... 69/69 + ok test/issues.js ..................................... 44/44 + ok test/stream.js ................................... 300/300 + ok test/issues/10.js .................................. 11/11 + total ............................................... 435/435 + + ok + +## License + +Apache 2.0 + +[req]: https://github.com/mikeal/request +[tap]: https://github.com/isaacs/node-tap diff --git a/follow_tests/couch.js b/follow_tests/couch.js new file mode 100644 index 00000000..69ee0eca --- /dev/null +++ b/follow_tests/couch.js @@ -0,0 +1,176 @@ +// CouchDB tests +// +// This module is also a library for other test modules. +// +// Copyright © 2017 IBM Corp. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +var tap = require('tap') + , util = require('util') + , assert = require('assert') + , request = require('request') + +var follow = require('../lib/follow/api.js') + , DB = process.env.db || 'http://localhost:5984/follow_test' + , DB_UPDATES = process.env.db_updates || 'http://localhost:5984/_db_updates' + , RTT = null + + +module.exports = { 'DB': DB + , 'DB_UPDATES': DB_UPDATES + , 'rtt' : get_rtt + , 'redo': redo_couch + , 'setup': setup_test + , 'make_data': make_data + , 'delete_db': delete_db + } + + +function get_rtt() { + if(!RTT) + throw new Error('RTT was not set. Use setup(test) or redo(callback)') + return RTT +} + + +// Basically a redo but testing along the way. +function setup_test(test_func) { + assert.equal(typeof test_func, 'function', 'Please provide tap.test function') + + test_func('Initialize CouchDB', function(t) { + init_db(t, function(er, rtt) { + RTT = rtt + t.end() + }) + }) +} + +function redo_couch(callback) { + function noop() {} + var t = { 'ok':noop, 'false':noop, 'equal':noop, 'end':noop } + init_db(t, function(er, rtt) { + if(rtt) + RTT = rtt + return callback(er) + }) +} + +function init_db(t, callback) { + var create_begin = new Date + +console.log('HERE!'); + request.del({uri:DB, json:true}, function(er, res) { + console.log('RESPONSE',er, res); + t.false(er, 'Clear old test DB: ' + DB) + t.ok(!res.body.error || res.body.error == 'not_found', 'Couch cleared old test DB: ' + DB) + + request.put({uri:DB, json:true}, function(er, res) { + t.false(er, 'Create new test DB: ' + DB) + t.false(res.body.error, 'Couch created new test DB: ' + DB) + + var values = ['first', 'second', 'third'] + , stores = 0 + values.forEach(function(val) { + var doc = { _id:'doc_'+val, value:val } + + request.post({uri:DB, json:doc}, function(er, res) { + t.false(er, 'POST document') + t.equal(res.statusCode, 201, 'Couch stored test document') + + stores += 1 + if(stores == values.length) { + var rtt = (new Date) - create_begin + callback(null, rtt) + //request.post({uri:DB, json:{_id:'_local/rtt', ms:(new Date)-begin}}, function(er, res) { + // t.false(er, 'Store RTT value') + // t.equal(res.statusCode, 201, 'Couch stored RTT value') + // t.end() + //}) + } + }) + }) + }) + }) +} + +function delete_db(t, callback) { + request.del({uri: DB, json: true}, function (er, res) { + t.false(er, 'Clear old test DB: ' + DB) + t.ok(!res.body.error) + callback() + }) +} + +function make_data(minimum_size, callback) { + var payload = {'docs':[]} + , size = 0 + + // TODO: Make document number 20 really large, at least over 9kb. + while(size < minimum_size) { + var doc = {} + , key_count = rndint(0, 25) + + while(key_count-- > 0) + doc[rndstr(8)] = rndstr(20) + + // The 20th document has one really large string value. + if(payload.docs.length == 19) { + var big_str = rndstr(9000, 15000) + doc.big = {'length':big_str.length, 'value':big_str} + } + + size += JSON.stringify(doc).length // This is an underestimate because an _id and _rev will be added. + payload.docs.push(doc) + } + + request.post({'uri':DB+'/_bulk_docs', 'json':payload}, function(er, res, body) { + if(er) throw er + + if(res.statusCode != 201) + throw new Error('Bad bulk_docs update: ' + util.inspect(res.body)) + + if(res.body.length != payload.docs.length) + throw new Error('Should have results for '+payload.docs.length+' doc insertions') + + body.forEach(function(result) { + if(!result || !result.id || !result.rev) + throw new Error('Bad bulk_docs response: ' + util.inspect(result)) + }) + + return callback(payload.docs.length) + }) + + function rndstr(minlen, maxlen) { + if(!maxlen) { + maxlen = minlen + minlen = 1 + } + + var str = "" + , length = rndint(minlen, maxlen) + + while(length-- > 0) + str += String.fromCharCode(rndint(97, 122)) + + return str + } + + function rndint(min, max) { + return min + Math.floor(Math.random() * (max - min + 1)) + } +} + + +if(require.main === module) + setup_test(tap.test) diff --git a/follow_tests/follow.js b/follow_tests/follow.js new file mode 100644 index 00000000..af438cad --- /dev/null +++ b/follow_tests/follow.js @@ -0,0 +1,258 @@ +// Copyright © 2017 IBM Corp. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +var tap = require('tap') + , test = tap.test + , util = require('util') + , request = require('request') + +var couch = require('./couch') + , follow = require('../lib/follow/api.js') + + +couch.setup(test) + +test('Follow API', function(t) { + var i = 0 + , saw = {} + + var feed = follow(couch.DB, function(er, change) { + t.is(this, feed, 'Callback "this" value is the feed object') + + i += 1 + t.false(er, 'No error coming back from follow: ' + i) + t.equal(change.seq, i, 'Change #'+i+' should have seq_id='+i) + saw[change.id] = true + + if(i == 3) { + t.ok(saw.doc_first, 'Got the first document') + t.ok(saw.doc_second, 'Got the second document') + t.ok(saw.doc_third , 'Got the third document') + + t.doesNotThrow(function() { feed.stop() }, 'No problem calling stop()') + + t.end() + } + }) +}) + +test("Confirmation request behavior", function(t) { + var feed = follow(couch.DB, function() {}) + + var confirm_req = null + , follow_req = null + + feed.on('confirm_request', function(req) { confirm_req = req }) + feed.on('query', function(req) { follow_req = req }) + + setTimeout(check_req, couch.rtt() * 2) + function check_req() { + t.ok(confirm_req, 'The confirm_request event should have fired by now') + t.ok(confirm_req.agent, 'The confirm request has an agent') + + t.ok(follow_req, 'The follow_request event should have fired by now') + t.ok(follow_req.agent, 'The follow request has an agent') + + // Confirm that the changes follower is not still in the pool. + var host = 'localhost:5984' + var sockets = follow_req.req.agent.sockets[host] || [] + sockets.forEach(function(socket, i) { + t.isNot(socket, follow_req.req.connection, 'The changes follower is not socket '+i+' in the agent pool') + }) + + feed.stop() + t.end() + } +}) + +test('Heartbeats', function(t) { + t.ok(couch.rtt(), 'The couch RTT is known') + var check_time = couch.rtt() * 3.5 // Enough time for 3 heartbeats. + + var beats = 0 + , retries = 0 + + var feed = follow(couch.DB, function() {}) + feed.heartbeat = couch.rtt() + feed.on('response', function() { feed.retry_delay = 1 }) + + feed.on('heartbeat', function() { beats += 1 }) + feed.on('retry', function() { retries += 1 }) + + feed.on('catchup', function() { + t.equal(beats, 0, 'Still 0 heartbeats after receiving changes') + t.equal(retries, 0, 'Still 0 retries after receiving changes') + + //console.error('Waiting ' + couch.rtt() + ' * 3 = ' + check_time + ' to check stuff') + setTimeout(check_counters, check_time) + function check_counters() { + t.equal(beats, 3, 'Three heartbeats ('+couch.rtt()+') fired after '+check_time+' ms') + t.equal(retries, 0, 'No retries after '+check_time+' ms') + + feed.stop() + t.end() + } + }) +}) + +test('Catchup events', function(t) { + t.ok(couch.rtt(), 'The couch RTT is known') + + var feed = follow(couch.DB, function() {}) + var last_seen = 0 + + feed.on('change', function(change) { last_seen = change.seq }) + feed.on('catchup', function(id) { + t.equal(last_seen, 3, 'The catchup event fires after the change event that triggered it') + t.equal(id , 3, 'The catchup event fires on the seq_id of the latest change') + feed.stop() + t.end() + }) +}) + +test('Data due on a paused feed', function(t) { + t.ok(couch.rtt(), 'The couch RTT is known') + var HB = couch.rtt() * 5 + , HB_DUE = HB * 1.25 // This comes from the hard-coded constant in feed.js. + + var events = [] + , last_event = new Date + + function ev(type) { + var now = new Date + events.push({'elapsed':now - last_event, 'event':type}) + last_event = now + } + + var feed = follow(couch.DB, function() {}) + feed.heartbeat = HB + feed.on('response', function() { feed.retry_delay = 1 }) + // TODO + // feed.pause() + + feed.on('heartbeat', function() { ev('heartbeat') }) + feed.on('change' , function() { ev('change') }) + feed.on('timeout' , function() { ev('timeout') }) + feed.on('retry' , function() { ev('retry') }) + feed.on('pause' , function() { ev('pause') }) + feed.on('resume' , function() { ev('resume') }) + feed.on('stop' , function() { ev('stop') }) + + feed.on('change', function(change) { + if(change.seq == 1) { + feed.pause() + // Stay paused long enough for three heartbeats to be overdue. + setTimeout(function() { feed.resume() }, HB_DUE * 3 * 1.1) + } + + if(change.seq == 3) { + feed.stop() + setTimeout(check_results, couch.rtt()) + } + }) + + function check_results() { + console.error('rtt=%j HB=%j', couch.rtt(), HB) + events.forEach(function(event, i) { console.error('Event %d: %j', i, event) }) + + t.equal(events[0].event, 'change', 'First event was a change') + t.ok(events[0].elapsed < couch.rtt(), 'First change came quickly') + + t.equal(events[1].event, 'pause', 'Second event was a pause, reacting to the first change') + t.ok(events[1].elapsed < 10, 'Pause called/fired immediately after the change') + + t.equal(events[2].event, 'timeout', 'Third event was the first timeout') + t.ok(percent(events[2].elapsed, HB_DUE) > 95, 'First timeout fired when the heartbeat was due') + + t.equal(events[3].event, 'retry', 'Fourth event is a retry') + t.ok(events[3].elapsed < 10, 'First retry fires immediately after the first timeout') + + t.equal(events[4].event, 'timeout', 'Fifth event was the second timeout') + t.ok(percent(events[4].elapsed, HB_DUE) > 95, 'Second timeout fired when the heartbeat was due') + + t.equal(events[5].event, 'retry', 'Sixth event is a retry') + t.ok(events[5].elapsed < 10, 'Second retry fires immediately after the second timeout') + + t.equal(events[6].event, 'timeout', 'Seventh event was the third timeout') + t.ok(percent(events[6].elapsed, HB_DUE) > 95, 'Third timeout fired when the heartbeat was due') + + t.equal(events[7].event, 'retry', 'Eighth event is a retry') + t.ok(events[7].elapsed < 10, 'Third retry fires immediately after the third timeout') + + t.equal(events[8].event, 'resume', 'Ninth event resumed the feed') + + t.equal(events[9].event, 'change', 'Tenth event was the second change') + t.ok(events[9].elapsed < 10, 'Second change came immediately after resume') + + t.equal(events[10].event, 'change', 'Eleventh event was the third change') + t.ok(events[10].elapsed < 10, 'Third change came immediately after the second change') + + t.equal(events[11].event, 'stop', 'Twelfth event was the feed stopping') + t.ok(events[11].elapsed < 10, 'Stop event came immediately in response to the third change') + + t.notOk(events[12], 'No thirteenth event') + + return t.end() + } + + function percent(a, b) { + var num = Math.min(a, b) + , denom = Math.max(a, b) + return 100 * num / denom + } +}) + +test('Events for DB confirmation and hitting the original seq', function(t) { + t.plan(7) + var feed = follow(couch.DB, on_change) + + var events = { 'confirm':null } + feed.on('confirm', function(db) { events.confirm = db }) + feed.on('catchup', caught_up) + + // This will run 3 times. + function on_change(er, ch) { + t.false(er, 'No problem with the feed') + if(ch.seq == 3) { + t.ok(events.confirm, 'Confirm event fired') + t.equal(events.confirm && events.confirm.db_name, 'follow_test', 'Confirm event returned the Couch DB object') + t.equal(events.confirm && events.confirm.update_seq, 3, 'Confirm event got the update_seq right') + } + } + + function caught_up(seq) { + t.equal(seq, 3, 'Catchup event fired on update 3') + + feed.stop() + t.end() + } +}) + +test('Handle a deleted database', function(t) { + var feed = follow(couch.DB, function(er, change) { + if(er){ + t.equal(er.last_seq, 3, 'Got an error for the deletion event') + return t.end() + } + + if(change.seq < 3) + return + + t.equal(change.seq, 3, 'Got change number 3') + + couch.delete_db(t, function(er) { + t.false(er, 'No problem deleting database') + }) + }) +}) diff --git a/follow_tests/issues.js b/follow_tests/issues.js new file mode 100644 index 00000000..bf760d49 --- /dev/null +++ b/follow_tests/issues.js @@ -0,0 +1,92 @@ +// Copyright © 2017 IBM Corp. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +var tap = require('tap') + , test = tap.test + , util = require('util') + , request = require('request') + +var lib = require('../lib/follow/lib') + , couch = require('./couch') + , follow = require('../lib/follow/api.js') + +couch.setup(test) + +test('Issue #5', function(t) { + var saw = { loops:0, seqs:{} } + + var saw_change = false + // -2 means I want to see the last change. + var feed = follow({'db':couch.DB, since:-2}, function(er, change) { + t.equal(change.seq, 3, 'Got the latest change, 3') + t.false(saw_change, 'Only one callback run for since=-2 (assuming no subsequent change') + saw_change = true + + process.nextTick(function() { feed.stop() }) + feed.on('stop', function() { + // Test using since=-1 (AKA since="now"). + follow({'db':couch.DB, since:'now'}, function(er, change) { + t.equal(change.seq, 4, 'Only get changes made after starting the feed') + t.equal(change.id, "You're in now, now", 'Got the subsequent change') + + this.stop() + t.end() + }) + + // Let that follower settle in, then send it something + setTimeout(function() { + var doc = { _id:"You're in now, now", movie:"Spaceballs" } + request.post({uri:couch.DB, json:doc}, function(er) { + if(er) throw er + }) + }, couch.rtt()) + }) + }) +}) + +couch.setup(test) // Back to the expected documents + +test('Issue #6', function(t) { + // When we see change 1, delete the database. The rest should still come in, then the error indicating deletion. + var saw = { seqs:{}, redid:false, redo_err:null } + + follow(couch.DB, function(er, change) { + if(!er) { + saw.seqs[change.seq] = true + t.notOk(change.last_seq, 'Change '+change.seq+' ha no .last_seq') + if(change.seq == 1) { + couch.delete_db(t, function(er) { + saw.redid = true + saw.redo_err = er + }) + } + } + + else setTimeout(function() { + // Give the redo time to finish, then confirm that everything happened as expected. + // Hopefully this error indicates the database was deleted. + t.ok(er.message.match(/deleted .* 3$/), 'Got delete error after change 3') + t.ok(er.deleted, 'Error object indicates database deletion') + t.equal(er.last_seq, 3, 'Error object indicates the last change number') + + t.ok(saw.seqs[1], 'Change 1 was processed') + t.ok(saw.seqs[2], 'Change 2 was processed') + t.ok(saw.seqs[3], 'Change 3 was processed') + t.ok(saw.redid, 'The redo function ran') + t.false(saw.redo_err, 'No problem redoing the database') + + return t.end() + }, couch.rtt() * 2) + }) +}) diff --git a/follow_tests/stream.js b/follow_tests/stream.js new file mode 100644 index 00000000..761379d5 --- /dev/null +++ b/follow_tests/stream.js @@ -0,0 +1,574 @@ +var tap = require('tap') + , test = tap.test + , util = require('util') + , request = require('request') + +var couch = require('./couch') + , follow = require('../lib/follow/api.js') + + +couch.setup(test) + +test('The Changes stream API', function(t) { + var feed = new follow.Changes + + t.type(feed.statusCode, 'null', 'Changes has a .statusCode (initially null)') + t.type(feed.setHeader, 'function', 'Changes has a .setHeader() method') + t.type(feed.headers, 'object', 'Changes has a .headers object') + t.same(feed.headers, {}, 'Changes headers are initially empty') + + t.end() +}) + +test('Readable Stream API', function(t) { + var feed = new follow.Changes + + t.is(feed.readable, true, 'Changes is a readable stream') + + t.type(feed.setEncoding, 'function', 'Changes has .setEncoding() method') + t.type(feed.pause, 'function', 'Changes has .pause() method') + t.type(feed.resume, 'function', 'Changes has .resume() method') + t.type(feed.destroy, 'function', 'Changes has .destroy() method') + t.type(feed.destroySoon, 'function', 'Changes has .destroySoon() method') + t.type(feed.pipe, 'function', 'Changes has .pipe() method') + + t.end() +}) + +test('Writatable Stream API', function(t) { + var feed = new follow.Changes + + t.is(feed.writable, true, 'Changes is a writable stream') + + t.type(feed.write, 'function', 'Changes has .write() method') + t.type(feed.end, 'function', 'Changes has .end() method') + t.type(feed.destroy, 'function', 'Changes has .destroy() method') + t.type(feed.destroySoon, 'function', 'Changes has .destroySoon() method') + + t.end() +}) + +test('Error conditions', function(t) { + var feed = new follow.Changes + t.throws(write, 'Throw if the feed type is not defined') + + feed = new follow.Changes + feed.feed = 'neither longpoll nor continuous' + t.throws(write, 'Throw if the feed type is not longpoll nor continuous') + + feed = new follow.Changes({'feed':'longpoll'}) + t.throws(write('stuff'), 'Throw if the "results" line is not sent first') + + feed = new follow.Changes({'feed':'longpoll'}) + t.doesNotThrow(write('') , 'Empty string is fine waiting for "results"') + t.doesNotThrow(write('{') , 'This could be the "results" line') + t.doesNotThrow(write('"resu', 'Another part of the "results" line')) + t.doesNotThrow(write('') , 'Another empty string is still fine') + t.doesNotThrow(write('lts":', 'Final part of "results" line still good')) + t.throws(write(']'), 'First line was not {"results":[') + + feed = new follow.Changes + feed.feed = 'continuous' + t.doesNotThrow(write(''), 'Empty string is fine for a continuous feed') + t.throws(end('{"results":[\n'), 'Continuous stream does not want a header') + + feed = new follow.Changes({'feed':'continuous'}) + t.throws(write('hi\n'), 'Continuous stream wants objects') + + feed = new follow.Changes({'feed':'continuous'}) + t.throws(end('[]\n'), 'Continuous stream wants "real" objects, not Array') + + feed = new follow.Changes({'feed':'continuous'}) + t.throws(write('{"seq":1,"id":"hi","changes":[{"rev":"1-869df2efe56ff5228e613ceb4d561b35"}]},\n'), + 'Continuous stream does not want a comma') + + var types = ['longpoll', 'continuous'] + types.forEach(function(type) { + var bad_writes = [ {}, null, ['a string (array)'], {'an':'object'}] + bad_writes.forEach(function(obj) { + feed = new follow.Changes + feed.feed = type + + t.throws(write(obj), 'Throw for bad write to '+type+': ' + util.inspect(obj)) + }) + + feed = new follow.Changes + feed.feed = type + + var valid = (type == 'longpoll') + ? '{"results":[\n{}\n],\n"last_seq":1}' + : '{"seq":1,"id":"doc"}' + + t.throws(buf(valid, 'but_invalid_encoding'), 'Throw for buffer with bad encoding') + }) + + t.end() + + function buf(data, encoding) { + return write(new Buffer(data), encoding) + } + + function write(data, encoding) { + if(data === undefined) + return feed.write('blah') + return function() { feed.write(data, encoding) } + } + + function end(data, encoding) { + return function() { feed.end(data, encoding) } + } +}) + +test('Longpoll feed', function(t) { + for(var i = 0; i < 2; i++) { + var feed = new follow.Changes({'feed':'longpoll'}) + + var data = [] + feed.on('data', function(d) { data.push(d) }) + + function encode(data) { return (i == 0) ? data : new Buffer(data) } + function write(data) { return function() { feed.write(encode(data)) } } + function end(data) { return function() { feed.end(encode(data)) } } + + t.doesNotThrow(write('{"results":[') , 'Longpoll header') + t.doesNotThrow(write('{}') , 'Empty object') + t.doesNotThrow(write(',{"foo":"bar"},') , 'Comma prefix and suffix') + t.doesNotThrow(write('{"two":"bar"},') , 'Comma suffix') + t.doesNotThrow(write('{"three":3},{"four":4}'), 'Two objects on one line') + t.doesNotThrow(end('],\n"last_seq":3}\n') , 'Longpoll footer') + + t.equal(data.length, 5, 'Five data events fired') + t.equal(data[0], '{}', 'First object emitted') + t.equal(data[1], '{"foo":"bar"}', 'Second object emitted') + t.equal(data[2], '{"two":"bar"}', 'Third object emitted') + t.equal(data[3], '{"three":3}', 'Fourth object emitted') + t.equal(data[4], '{"four":4}', 'Fifth object emitted') + } + + t.end() +}) + +test('Longpoll pause', function(t) { + var feed = new follow.Changes({'feed':'longpoll'}) + , all = {'results':[{'change':1}, {'second':'change'},{'change':'#3'}], 'last_seq':3} + , start = new Date + + var events = [] + + feed.on('data', function(change) { + change = JSON.parse(change) + change.elapsed = new Date - start + events.push(change) + }) + + feed.once('data', function(data) { + t.equal(data, '{"change":1}', 'First data event was the first change') + feed.pause() + setTimeout(function() { feed.resume() }, 100) + }) + + feed.on('end', function() { + t.equal(feed.readable, false, 'Feed is no longer readable') + events.push('END') + }) + + setTimeout(check_events, 150) + feed.end(JSON.stringify(all)) + + function check_events() { + t.equal(events.length, 3+1, 'Three data events, plus the end event') + + t.ok(events[0].elapsed < 10, 'Immediate emit first data event') + t.ok(events[1].elapsed >= 100 && events[1].elapsed < 125, 'About 100ms delay until the second event') + t.ok(events[2].elapsed - events[1].elapsed < 10, 'Immediate emit of subsequent event after resume') + t.equal(events[3], 'END', 'End event was fired') + + t.end() + } +}) + +test('Continuous feed', function(t) { + for(var i = 0; i < 2; i++) { + var feed = new follow.Changes({'feed':'continuous'}) + + var data = [] + feed.on('data', function(d) { data.push(d) }) + feed.on('end', function() { data.push('END') }) + + var beats = 0 + feed.on('heartbeat', function() { beats += 1 }) + + function encode(data) { return (i == 0) ? data : new Buffer(data) } + function write(data) { return function() { feed.write(encode(data)) } } + function end(data) { return function() { feed.end(encode(data)) } } + + // This also tests whether the feed is compacting or tightening up the JSON. + t.doesNotThrow(write('{ }\n') , 'Empty object') + t.doesNotThrow(write('\n') , 'Heartbeat') + t.doesNotThrow(write('{ "foo" : "bar" }\n') , 'One object') + t.doesNotThrow(write('{"three":3}\n{ "four": 4}\n'), 'Two objects sent in one chunk') + t.doesNotThrow(write('') , 'Empty string') + t.doesNotThrow(write('\n') , 'Another heartbeat') + t.doesNotThrow(write('') , 'Another empty string') + t.doesNotThrow(write('{ "end" ') , 'Partial object 1/4') + t.doesNotThrow(write(':') , 'Partial object 2/4') + t.doesNotThrow(write('tru') , 'Partial object 3/4') + t.doesNotThrow(end('e}\n') , 'Partial object 4/4') + + t.equal(data.length, 6, 'Five objects emitted, plus the end event') + t.equal(beats, 2, 'Two heartbeats emitted') + + t.equal(data[0], '{}', 'First object emitted') + t.equal(data[1], '{"foo":"bar"}', 'Second object emitted') + t.equal(data[2], '{"three":3}', 'Third object emitted') + t.equal(data[3], '{"four":4}', 'Fourth object emitted') + t.equal(data[4], '{"end":true}', 'Fifth object emitted') + t.equal(data[5], 'END', 'End event fired') + } + + t.end() +}) + +test('Continuous pause', function(t) { + var feed = new follow.Changes({'feed':'continuous'}) + , all = [{'change':1}, {'second':'change'},{'#3':'change'}] + , start = new Date + + var events = [] + + feed.on('end', function() { + t.equal(feed.readable, false, 'Feed is not readable after "end" event') + events.push('END') + }) + + feed.on('data', function(change) { + change = JSON.parse(change) + change.elapsed = new Date - start + events.push(change) + }) + + feed.once('data', function(data) { + t.equal(data, '{"change":1}', 'First data event was the first change') + t.equal(feed.readable, true, 'Feed is readable after first data event') + feed.pause() + t.equal(feed.readable, true, 'Feed is readable after pause()') + + setTimeout(unpause, 100) + function unpause() { + t.equal(feed.readable, true, 'Feed is readable just before resume()') + feed.resume() + } + }) + + setTimeout(check_events, 150) + all.forEach(function(obj) { + feed.write(JSON.stringify(obj)) + feed.write("\r\n") + }) + feed.end() + + function check_events() { + t.equal(events.length, 3+1, 'Three data events, plus the end event') + + t.ok(events[0].elapsed < 10, 'Immediate emit first data event') + t.ok(events[1].elapsed >= 100 && events[1].elapsed < 125, 'About 100ms delay until the second event') + t.ok(events[2].elapsed - events[1].elapsed < 10, 'Immediate emit of subsequent event after resume') + t.equal(events[3], 'END', 'End event was fired') + + t.end() + } +}) + +test('Paused while heartbeats are arriving', function(t) { + var feed = new follow.Changes({'feed':'continuous'}) + , start = new Date + + var events = [] + + feed.on('data', function(change) { + change = JSON.parse(change) + change.elapsed = new Date - start + events.push(change) + }) + + feed.on('heartbeat', function(hb) { + var hb = {'heartbeat':true, 'elapsed':(new Date) - start} + events.push(hb) + }) + + feed.once('data', function(data) { + t.equal(data, '{"change":1}', 'First data event was the first change') + feed.pause() + t.equal(feed.readable, true, 'Feed is readable after pause()') + + setTimeout(unpause, 350) + function unpause() { + t.equal(feed.readable, true, 'Feed is readable just before resume()') + feed.resume() + } + }) + + // Simulate a change, then a couple heartbeats every 100ms, then more changes. + var writes = [] + function write(data) { + var result = feed.write(data) + writes.push(result) + } + + setTimeout(function() { write('{"change":1}\n') }, 0) + setTimeout(function() { write('\n') }, 100) + setTimeout(function() { write('\n') }, 200) + setTimeout(function() { write('\n') }, 300) + setTimeout(function() { write('\n') }, 400) + setTimeout(function() { write('{"change":2}\n') }, 415) + setTimeout(function() { write('{"change":3}\n') }, 430) + + setTimeout(check_events, 500) + function check_events() { + t.ok(events[0].change , 'First event was data (a change)') + t.ok(events[1].heartbeat, 'Second event was a heartbeat') + t.ok(events[2].heartbeat, 'Third event was a heartbeat') + t.ok(events[3].heartbeat, 'Fourth event was a heartbeat') + t.ok(events[4].heartbeat, 'Fifth event was a heartbeat') + t.ok(events[5].change , 'Sixth event was data') + t.ok(events[6].change , 'Seventh event was data') + + t.ok(events[0].elapsed < 10, 'Immediate emit first data event') + t.ok(events[1].elapsed > 350, 'First heartbeat comes after the pause') + t.ok(events[2].elapsed > 350, 'Second heartbeat comes after the pause') + t.ok(events[3].elapsed > 350, 'Third heartbeat comes after the pause') + t.ok(events[4].elapsed > 350, 'Fourth heartbeat comes after the pause') + + t.ok(events[3].elapsed < 360, 'Third heartbeat came right after the resume') + t.ok(events[4].elapsed >= 400, 'Fourth heartbeat came at the normal time') + + t.equal(writes[0], true , 'First write flushed') + t.equal(writes[1], false, 'Second write (first heartbeat) did not flush because the feed was paused') + t.equal(writes[2], false, 'Third write did not flush due to pause') + t.equal(writes[3], false, 'Fourth write did not flush due to pause') + t.equal(writes[4], true , 'Fifth write (fourth heartbeat) flushed due to resume()') + t.equal(writes[5], true , 'Sixth write flushed normally') + t.equal(writes[6], true , 'Seventh write flushed normally') + + feed.end() + t.end() + } +}) + +test('Feeds from couch', function(t) { + t.ok(couch.rtt(), 'RTT to couch is known') + + var did = 0 + function done() { + did += 1 + if(did == 2) + t.end() + } + + var types = [ 'longpoll', 'continuous' ] + types.forEach(function(type) { + var feed = new follow.Changes({'feed':type}) + setTimeout(check_changes, couch.rtt() * 2) + + var events = [] + feed.on('data', function(data) { events.push(JSON.parse(data)) }) + feed.on('end', function() { events.push('END') }) + + var uri = couch.DB + '/_changes?feed=' + type + var req = request({'uri':uri}) + + // Compatibility with the old onResponse option. + req.on('response', function(res) { on_response(null, res, res.body) }) + + // Disconnect the continuous feed after a while. + if(type == 'continuous') + setTimeout(function() { feed.destroy() }, couch.rtt() * 1) + + function on_response(er, res, body) { + t.false(er, 'No problem fetching '+type+' feed: ' + uri) + t.type(body, 'undefined', 'No data in '+type+' callback. This is an onResponse callback') + t.type(res.body, 'undefined', 'No response body in '+type+' callback. This is an onResponse callback') + t.ok(req.response, 'The request object has its '+type+' response by now') + + req.pipe(feed) + + t.equal(feed.statusCode, 200, 'Upon piping from request, the statusCode is set') + t.ok('content-type' in feed.headers, 'Upon piping from request, feed has headers set') + } + + function check_changes() { + var expected_count = 3 + if(type == 'longpoll') + expected_count += 1 // For the "end" event + + t.equal(events.length, expected_count, 'Change event count for ' + type) + + t.equal(events[0].seq, 1, 'First '+type+' update sequence id') + t.equal(events[1].seq, 2, 'Second '+type+' update sequence id') + t.equal(events[2].seq, 3, 'Third '+type+' update sequence id') + + t.equal(good_id(events[0]), true, 'First '+type+' update is a good doc id: ' + events[0].id) + t.equal(good_id(events[1]), true, 'Second '+type+' update is a good doc id: ' + events[1].id) + t.equal(good_id(events[2]), true, 'Third '+type+' update is a good doc id: ' + events[2].id) + + if(type == 'longpoll') + t.equal(events[3], 'END', 'End event fired for '+type) + else + t.type(events[3], 'undefined', 'No end event for a destroyed continuous feed') + + done() + } + + var good_ids = {'doc_first':true, 'doc_second':true, 'doc_third':true} + function good_id(event) { + var is_good = good_ids[event.id] + delete good_ids[event.id] + return is_good + } + }) +}) + +test('Pausing and destroying a feed mid-stream', function(t) { + t.ok(couch.rtt(), 'RTT to couch is known') + var IMMEDIATE = 20 + , FIRST_PAUSE = couch.rtt() * 8 + , SECOND_PAUSE = couch.rtt() * 12 + + // To be really, really sure that backpressure goes all the way to couch, create more + // documents than could possibly be buffered. Linux and OSX seem to have a 16k MTU for + // the local interface, so a few hundred kb worth of document data should cover it. + couch.make_data(512 * 1024, check) + + var types = ['longpoll', 'continuous'] + function check(bulk_docs_count) { + var type = types.shift() + if(!type) + return t.end() + + var feed = new follow.Changes + feed.feed = type + + var destroys = 0 + function destroy() { + destroys += 1 + feed.destroy() + + // Give one more RTT time for everything to wind down before checking how it all went. + if(destroys == 1) + setTimeout(check_events, couch.rtt()) + } + + var events = {'feed':[], 'http':[], 'request':[]} + , firsts = {'feed':null, 'http':null, 'request':null} + + function ev(type, value) { + var now = new Date + firsts[type] = firsts[type] || now + events[type].push({'elapsed':now - firsts[type], 'at':now, 'val':value, 'before_destroy':(destroys == 0)}) + } + + feed.on('heartbeat', function() { ev('feed', 'heartbeat') }) + feed.on('error', function(er) { ev('feed', er) }) + feed.on('close', function() { ev('feed', 'close') }) + feed.on('data', function(data) { ev('feed', data) }) + feed.on('end', function() { ev('feed', 'end') }) + + var data_count = 0 + feed.on('data', function() { + data_count += 1 + if(data_count == 4) { + feed.pause() + setTimeout(function() { feed.resume() }, FIRST_PAUSE) + } + + if(data_count == 7) { + feed.pause() + setTimeout(function() { feed.resume() }, SECOND_PAUSE - FIRST_PAUSE) + } + + if(data_count >= 10) + destroy() + }) + + var uri = couch.DB + '/_changes?include_docs=true&feed=' + type + if(type == 'continuous') + uri += '&heartbeat=' + Math.floor(couch.rtt()) + + var req = request({'uri':uri}) + req.on('response', function(res) { feed_response(null, res, res.body) }) + req.on('error', function(er) { ev('request', er) }) + req.on('close', function() { ev('request', 'close') }) + req.on('data', function(d) { ev('request', d) }) + req.on('end', function() { ev('request', 'end') }) + req.pipe(feed) + + function feed_response(er, res) { + if(er) throw er + + res.on('error', function(er) { ev('http', er) }) + res.on('close', function() { ev('http', 'close') }) + res.on('data', function(d) { ev('http', d) }) + res.on('end', function() { ev('http', 'end') }) + + t.equal(events.feed.length, 0, 'No feed events yet: ' + type) + t.equal(events.http.length, 0, 'No http events yet: ' + type) + t.equal(events.request.length, 0, 'No request events yet: ' + type) + } + + function check_events() { + t.equal(destroys, 1, 'Only necessary to call destroy once: ' + type) + t.equal(events.feed.length, 10, 'Ten '+type+' data events fired') + if(events.feed.length != 10) + events.feed.forEach(function(e, i) { + console.error((i+1) + ') ' + util.inspect(e)) + }) + + events.feed.forEach(function(event, i) { + var label = type + ' event #' + (i+1) + ' at ' + event.elapsed + ' ms' + + t.type(event.val, 'string', label+' was a data string') + t.equal(event.before_destroy, true, label+' fired before the destroy') + + var change = null + t.doesNotThrow(function() { change = JSON.parse(event.val) }, label+' was JSON: ' + type) + t.ok(change && change.seq > 0 && change.id, label+' was change data: ' + type) + + // The first batch of events should have fired quickly (IMMEDIATE), then silence. Then another batch + // of events at the FIRST_PAUSE mark. Then more silence. Then a final batch at the SECOND_PAUSE mark. + if(i < 4) + t.ok(event.elapsed < IMMEDIATE, label+' was immediate (within '+IMMEDIATE+' ms)') + else if(i < 7) + t.ok(is_almost(event.elapsed, FIRST_PAUSE), label+' was after the first pause (about '+FIRST_PAUSE+' ms)') + else + t.ok(is_almost(event.elapsed, SECOND_PAUSE), label+' was after the second pause (about '+SECOND_PAUSE+' ms)') + }) + + if(type == 'continuous') { + t.ok(events.http.length >= 6, 'Should have at least seven '+type+' HTTP events') + t.ok(events.request.length >= 6, 'Should have at least seven '+type+' request events') + + t.ok(events.http.length < 200, type+' HTTP events ('+events.http.length+') stop before 200') + t.ok(events.request.length < 200, type+' request events ('+events.request.length+') stop before 200') + + var frac = events.http.length / bulk_docs_count + t.ok(frac < 0.10, 'Percent of http events received ('+frac.toFixed(1)+'%) is less than 10% of the data') + + frac = events.request.length / bulk_docs_count + t.ok(frac < 0.10, type+' request events received ('+frac.toFixed(1)+'%) is less than 10% of the data') + } + + return check(bulk_docs_count) + } + } +}) + +// +// Utilities +// + +function is_almost(actual, expected) { + var tolerance = 0.10 // 10% + , diff = Math.abs(actual - expected) + , fraction = diff / expected + return fraction < tolerance +} diff --git a/lib/follow/api.js b/lib/follow/api.js new file mode 100644 index 00000000..d7964dba --- /dev/null +++ b/lib/follow/api.js @@ -0,0 +1,35 @@ +// The changes_couchdb API +// +// Copyright 2011 Jason Smith, Jarrett Cruger and contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +var feed = require('./lib/feed') + , stream = require('./lib/stream') + +function follow_feed(opts, cb) { + var ch_feed = new feed.Feed(opts); + ch_feed.on('error' , function(er) { return cb && cb.call(ch_feed, er) }); + ch_feed.on('change', function(ch) { return cb && cb.call(ch_feed, null, ch) }); + + // Give the caller a chance to hook into any events. + process.nextTick(function() { + ch_feed.follow(); + }) + + return ch_feed; +} + +module.exports = follow_feed; +module.exports.Feed = feed.Feed; +module.exports.Changes = stream.Changes diff --git a/lib/follow/lib/feed.js b/lib/follow/lib/feed.js new file mode 100644 index 00000000..e6803bfb --- /dev/null +++ b/lib/follow/lib/feed.js @@ -0,0 +1,694 @@ +// Core routines for event emitters +// +// Copyright © 2017 IBM Corp. All rights reserved. +// +// Copyright 2011 Jason Smith, Jarrett Cruger and contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +var lib = require('../lib') + , util = require('util') + , events = require('events') + , request = require('request') + , Changes = require('./stream').Changes + , querystring = require('querystring') + +// Use the library timeout functions, primarily so the test suite can catch errors. +var setTimeout = lib.setTimeout + , clearTimeout = lib.clearTimeout + +var DEFAULT_HEARTBEAT = 30000; +var HEARTBEAT_TIMEOUT_COEFFICIENT = 1.25; // E.g. heartbeat 1000ms would trigger a timeout after 1250ms of no heartbeat. +var DEFAULT_MAX_RETRY_SECONDS = 60 * 60; +var INITIAL_RETRY_DELAY = 1000; +var RESPONSE_GRACE_TIME = 5000; + +var FEED_PARAMETERS = ['since', 'limit', 'feed', 'heartbeat', 'filter', 'include_docs', 'view', 'style', 'conflicts', 'attachments', 'att_encoding_info']; + +var EventEmitter = events.EventEmitter2 || events.EventEmitter; + + +util.inherits(Feed, EventEmitter); +function Feed (opts) { + var self = this; + EventEmitter.call(self); + + opts = opts || {} + + self.feed = 'continuous'; + self.heartbeat = opts.heartbeat || DEFAULT_HEARTBEAT; + self.max_retry_seconds = opts.max_retry_seconds || DEFAULT_MAX_RETRY_SECONDS; + self.inactivity_ms = null; + self.initial_retry_delay = opts.initial_retry_delay || INITIAL_RETRY_DELAY; + self.response_grace_time = opts.response_grace_time || RESPONSE_GRACE_TIME; + + self.headers = {}; + self.request = opts.request || {} // Extra options for potentially future versions of request. The caller can supply them. + + self.since = 0; + self.is_paused = false + self.caught_up = false + self.retry_delay = self.initial_retry_delay; + + self.query_params = {}; // Extra `req.query` values for filter functions + + if(typeof opts === 'string') + opts = {'db': opts}; + + Object.keys(opts).forEach(function(key) { + if (typeof self[key] !== 'function') + self[key] = opts[key]; + }) + + self.pending = { request : null + , activity_at : null + }; +} // Feed + +Feed.prototype.start = +Feed.prototype.follow = function follow_feed() { + var self = this; + + self.db = self.db || self.url || self.uri + delete self.url + delete self.uri + + if(!self.db) + throw new Error('Database URL required'); + + if (self.db.match(/\/_db_updates$/)) + self.is_db_updates = true; + + if(self.is_db_updates) + delete self.since; + + if(self.feed !== 'continuous' && self.feed !== 'longpoll') + throw new Error('The only valid feed options are "continuous" and "longpoll"'); + + if(typeof self.heartbeat !== 'number') + throw new Error('Required "heartbeat" value'); + + self.log = lib.log4js.getLogger(self.db); + self.log.setLevel(process.env.follow_log_level || "info"); + + self.emit('start'); + return self.confirm(); +} + +Feed.prototype.confirm = function confirm_feed() { + var self = this; + + self.db_safe = lib.scrub_creds(self.db); + + var endpoint = self.is_db_updates ? 'server' : 'database'; + + self.log.debug('Checking ' + endpoint + ': ' + self.db_safe); + + var confirm_timeout = self.heartbeat * 3; // Give it time to look up the name, connect, etc. + var timeout_id = setTimeout(function() { + return self.die(new Error('Timeout confirming ' + endpoint + ': ' + self.db_safe)); + }, confirm_timeout); + + var headers = lib.JP(lib.JS(self.headers)); + headers.accept = 'application/json'; + + var uri = self.is_db_updates ? self.db.replace(/\/_db_updates$/, '') : self.db; + var req = {'uri':uri, 'headers':headers} + Object.keys(self.request).forEach(function(key) { + req[key] = self.request[key]; + }) + + req = request(req, db_response) + self.emit('confirm_request', req) + + function db_response(er, resp, body) { + clearTimeout(timeout_id); + + if(er) + return self.die(er); + + var db; + try { + db = JSON.parse(body) + } catch(json_er) { + return self.emit('error', json_er) + } + + if(!self.is_db_updates && !self.dead && (!db.db_name || !db.instance_start_time)) + return self.emit('error', new Error('Bad DB response: ' + body)); + + if(self.is_db_updates && !self.dead && !db.couchdb) + return self.emit('error', new Error('Bad server response: ' + body)); + + if (!self.is_db_updates) + self.original_db_seq = db.update_seq + + self.log.debug('Confirmed ' + endpoint + ': ' + self.db_safe); + self.emit('confirm', db); + + if(self.since == 'now') { + self.log.debug('Query since "now" is the same as query since -1') + self.since = -1 + } + + if(self.since == -1) { + self.log.debug('Query since '+self.since+' will start at ' + db.update_seq) + self.since = db.update_seq + } else if(self.since < 0) { + if(isNaN(db.update_seq)) + return self.emit('error', new Error('DB requires specific id in "since"')); + + self.log.debug('Query since '+self.since+' will start at ' + (db.update_seq + self.since + 1)) + self.since = db.update_seq + self.since + 1 + } + + // If the next change would come after the current update_seq, just fake a catchup event now. + if(self.original_db_seq == self.since) { + self.caught_up = true + self.emit('catchup', db.update_seq) + } + + return self.query(); + } +} + +Feed.prototype.query = function query_feed() { + var self = this; + + var query_params = JSON.parse(JSON.stringify(self.query_params)); + + FEED_PARAMETERS.forEach(function(key) { + if(key in self) + query_params[key] = self[key]; + }) + + if(typeof query_params.filter !== 'string') + delete query_params.filter; + + if(typeof self.filter === 'function' && !query_params.include_docs) { + self.log.debug('Enabling include_docs for client-side filter'); + query_params.include_docs = true; + } + + // Limit the response size for longpoll. + var poll_size = 100; + if(query_params.feed == 'longpoll' && (!query_params.limit || query_params.limit > poll_size)) + query_params.limit = poll_size; + + var feed_url = self.db + (self.is_db_updates ? '' : '/_changes') + '?' + querystring.stringify(query_params); + + self.headers.accept = self.headers.accept || 'application/json'; + var req = { method : 'GET' + , uri : feed_url + , headers: self.headers + , encoding: 'utf-8' + } + + req.changes_query = query_params; + Object.keys(self.request).forEach(function(key) { + req[key] = self.request[key]; + }) + + var now = new Date + , feed_ts = lib.JDUP(now) + , feed_id = process.env.follow_debug ? feed_ts.match(/\.(\d\d\d)Z$/)[1] : feed_ts + + self.log.debug('Feed query ' + feed_id + ': ' + lib.scrub_creds(feed_url)) + var feed_request = request(req) + + feed_request.on('response', function(res) { + self.log.debug('Remove feed from agent pool: ' + feed_id) + feed_request.req.socket.emit('agentRemove') + + // Simulate the old onResponse option. + on_feed_response(null, res, res.body) + }) + + feed_request.on('error', on_feed_response) + + // The response headers must arrive within one heartbeat. + var response_timer = setTimeout(response_timed_out, self.heartbeat + self.response_grace_time) + , timed_out = false + + return self.emit('query', feed_request) + + function response_timed_out() { + self.log.debug('Feed response timed out: ' + feed_id) + timed_out = true + return self.retry() + } + + function on_feed_response(er, resp, body) { + clearTimeout(response_timer) + + if((resp !== undefined && resp.body) || body) + return self.die(new Error('Cannot handle a body in the feed response: ' + lib.JS(resp.body || body))) + + if(timed_out) { + self.log.debug('Ignoring late response: ' + feed_id); + return destroy_response(resp); + } + + if(er) { + self.log.debug('Request error ' + feed_id + ': ' + er.stack); + destroy_response(resp); + return self.retry(); + } + + if(resp.statusCode == 404) { + destroy_response(resp) + self.log.warn('Database not found. Stopping changes feed after seq ' + self.since) + var del_er = new Error('Database deleted after change: ' + self.since) + del_er.deleted = true + del_er.last_seq = self.since + return self.die(del_er) + } + + if(resp.statusCode !== 200) { + self.log.debug('Bad changes response ' + feed_id + ': ' + resp.statusCode); + destroy_response(resp); + return self.retry(); + } + + self.log.debug('Good response: ' + feed_id); + self.retry_delay = self.initial_retry_delay; + + self.emit('response', resp); + + var changes_stream = new Changes + changes_stream.log = lib.log4js.getLogger('stream ' + self.db) + changes_stream.log.setLevel(self.log.level.levelStr) + changes_stream.feed = self.feed + feed_request.pipe(changes_stream) + + changes_stream.created_at = now + changes_stream.id = function() { return feed_id } + return self.prep(changes_stream) + } +} + +Feed.prototype.prep = function prep_request(changes_stream) { + var self = this; + + var now = new Date; + self.pending.request = changes_stream; + self.pending.activity_at = now; + self.pending.wait_timer = null; + + // Just re-run the pause or resume to do the needful on changes_stream (self.pending.request). + if(self.is_paused) + self.pause() + else + self.resume() + + // The inactivity timer is for time between *changes*, or time between the + // initial connection and the first change. Therefore it goes here. + self.change_at = now; + if(self.inactivity_ms) { + clearTimeout(self.inactivity_timer); + self.inactivity_timer = setTimeout(function() { self.on_inactivity() }, self.inactivity_ms); + } + + changes_stream.on('heartbeat', handler_for('heartbeat')) + changes_stream.on('error', handler_for('error')) + changes_stream.on('data', handler_for('data')) + changes_stream.on('end', handler_for('end')) + + return self.wait(); + + function handler_for(ev) { + var name = 'on_couch_' + ev; + var inner_handler = self[name]; + + return handle_confirmed_req_event; + function handle_confirmed_req_event() { + if(self.pending.request === changes_stream) + return inner_handler.apply(self, arguments); + + if(!changes_stream.created_at) + return self.die(new Error("Received data from unknown request")); // Pretty sure this is impossible. + + var s_to_now = (new Date() - changes_stream.created_at) / 1000; + var s_to_req = '[no req]'; + if(self.pending.request) + s_to_req = (self.pending.request.created_at - changes_stream.created_at) / 1000; + + var msg = ': ' + changes_stream.id() + ' to_req=' + s_to_req + 's, to_now=' + s_to_now + 's'; + + if(ev == 'end' || ev == 'data' || ev == 'heartbeat') { + self.log.debug('Old "' + ev + '": ' + changes_stream.id()) + return destroy_req(changes_stream) + } + + self.log.warn('Old "'+ev+'"' + msg); + } + } +} + +Feed.prototype.wait = function wait_for_event() { + var self = this; + self.emit('wait'); + + if(self.pending.wait_timer) + return self.die(new Error('wait() called but there is already a wait_timer: ' + self.pending.wait_timer)); + + var timeout_ms = self.heartbeat * HEARTBEAT_TIMEOUT_COEFFICIENT; + var req_id = self.pending.request && self.pending.request.id() + var msg = 'Req ' + req_id + ' timeout=' + timeout_ms; + if(self.inactivity_ms) + msg += ', inactivity=' + self.inactivity_ms; + msg += ': ' + self.db_safe; + + self.log.debug(msg); + self.pending.wait_timer = setTimeout(function() { self.on_timeout() }, timeout_ms); +} + +Feed.prototype.got_activity = function() { + var self = this + + if (self.dead) + return + + // + // We may not have a wait_timer so just clear it and null it out if it does + // exist + // + clearTimeout(self.pending.wait_timer) + self.pending.wait_timer = null + self.pending.activity_at = new Date +} + + +Feed.prototype.pause = function() { + var self = this + , was_paused = self.is_paused + + // Emit pause after pausing the stream, to allow listeners to react. + self.is_paused = true + if(self.pending && self.pending.request && self.pending.request.pause) + self.pending.request.pause() + else + self.log.warn('No pending request to pause') + + if(!was_paused) + self.emit('pause') +} + +Feed.prototype.resume = function() { + var self = this + , was_paused = self.is_paused + + // Emit resume before resuming the data feed, to allow listeners to prepare. + self.is_paused = false + if(was_paused) + self.emit('resume') + + if(self.pending && self.pending.request && self.pending.request.resume) + self.pending.request.resume() + else + self.log.warn('No pending request to resume') +} + + +Feed.prototype.on_couch_heartbeat = function on_couch_heartbeat() { + var self = this + + self.got_activity() + if(self.dead) + return self.log.debug('Skip heartbeat processing for dead feed') + + self.emit('heartbeat') + + if(self.dead) + return self.log.debug('No wait: heartbeat listener stopped this feed') + self.wait() +} + +Feed.prototype.on_couch_data = function on_couch_data(change) { + var self = this; + self.log.debug('Data from ' + self.pending.request.id()); + + self.got_activity() + if(self.dead) + return self.log.debug('Skip data processing for dead feed') + + // The changes stream guarantees that this data is valid JSON. + change = JSON.parse(change) + + //self.log.debug('Object:\n' + util.inspect(change)); + if('last_seq' in change) { + self.log.debug('Found last_seq in change. Retrying from seq ' + change.last_seq) + self.since = change.last_seq + return self.retry() + } + + if(!self.is_db_updates && !change.seq) + return self.die(new Error('Change has no .seq field: ' + JSON.stringify(change))) + + self.on_change(change) + + // on_change() might work its way all the way to a "change" event, and the listener + // might call .stop(), which means among other things that no more events are desired. + // The die() code sets a self.dead flag to indicate this. + if(self.dead) + return self.log.debug('No wait: change listener stopped this feed') + self.wait() +} + +Feed.prototype.on_timeout = function on_timeout() { + var self = this; + if (self.dead) + return self.log.debug('No timeout: change listener stopped this feed'); + + self.log.debug('Timeout') + + var now = new Date; + var elapsed_ms = now - self.pending.activity_at; + + self.emit('timeout', {elapsed_ms:elapsed_ms, heartbeat:self.heartbeat, id:self.pending.request.id()}); + + /* + var msg = ' for timeout after ' + elapsed_ms + 'ms; heartbeat=' + self.heartbeat; + if(!self.pending.request.id) + self.log.warn('Closing req (no id) ' + msg + ' req=' + util.inspect(self.pending.request)); + else + self.log.warn('Closing req ' + self.pending.request.id() + msg); + */ + + destroy_req(self.pending.request); + self.retry() +} + +Feed.prototype.retry = function retry() { + var self = this; + + clearTimeout(self.pending.wait_timer); + self.pending.wait_timer = null; + + self.log.debug('Retry since=' + self.since + ' after ' + self.retry_delay + 'ms ') + self.emit('retry', {since:self.since, after:self.retry_delay, db:self.db_safe}); + + self.retry_timer = setTimeout(function() { self.query() }, self.retry_delay); + + var max_retry_ms = self.max_retry_seconds * 1000; + self.retry_delay *= 2; + if(self.retry_delay > max_retry_ms) + self.retry_delay = max_retry_ms; +} + +Feed.prototype.on_couch_end = function on_couch_end() { + var self = this; + + self.log.debug('Changes feed ended ' + self.pending.request.id()); + self.pending.request = null; + return self.retry(); +} + +Feed.prototype.on_couch_error = function on_couch_error(er) { + var self = this; + + self.log.debug('Changes query eror: ' + lib.JS(er.stack)); + return self.retry(); +} + +Feed.prototype.stop = function(val) { + var self = this + self.log.debug('Stop') + + // Die with no errors. + self.die() + self.emit('stop', val); +} + +Feed.prototype.die = function(er) { + var self = this; + + if(er) + self.log.fatal('Fatal error: ' + er.stack); + + // Warn code executing later that death has occured. + self.dead = true + + clearTimeout(self.retry_timer) + clearTimeout(self.inactivity_timer) + clearTimeout(self.pending.wait_timer) + + self.inactivity_timer = null + self.pending.wait_timer = null + + var req = self.pending.request; + self.pending.request = null; + if(req) { + self.log.debug('Destroying req ' + req.id()); + destroy_req(req); + } + + if(er) + self.emit('error', er); +} + +Feed.prototype.on_change = function on_change(change) { + var self = this; + + if(!self.is_db_updates && !change.seq) + return self.die(new Error('No seq value in change: ' + lib.JS(change))); + + if(!self.is_db_updates && change.seq == self.since) { + self.log.debug('Bad seq value ' + change.seq + ' since=' + self.since); + return destroy_req(self.pending.request); + } + + if(typeof self.filter !== 'function') + return self.on_good_change(change); + + var req = lib.JDUP({'query': self.pending.request.changes_query}); + var filter_args; + + if (self.is_db_updates) { + if(!change.db_name || !change.type) + return self.die(new Error('Internal _db_updates filter needs .db_name and .type in change ', change)); + filter_args = [change.db_name, change.type, req]; + } else { + if(!change.doc) + return self.die(new Error('Internal filter needs .doc in change ' + change.seq)); + + // Don't let the filter mutate the real data. + var doc = lib.JDUP(change.doc); + filter_args = [doc, req]; + } + var result = false; + try { + result = self.filter.apply(null, filter_args); + } catch (er) { + self.log.debug('Filter error', er); + } + + result = (result && true) || false; + if(result) { + self.log.debug('Builtin filter PASS for change: ' + change.seq); + return self.on_good_change(change); + } else { + self.log.debug('Builtin filter FAIL for change: ' + change.seq); + + // Even with a filtered change, a "catchup" event might still be appropriate. + self.check_for_catchup(change.seq) + } +} + +Feed.prototype.on_good_change = function on_good_change(change) { + var self = this; + + if(self.inactivity_ms && !self.inactivity_timer) + return self.die(new Error('Cannot find inactivity timer during change')); + + clearTimeout(self.inactivity_timer); + self.inactivity_timer = null; + if(self.inactivity_ms) + self.inactivity_timer = setTimeout(function() { self.on_inactivity() }, self.inactivity_ms); + + self.change_at = new Date; + + if(!self.is_db_updates) + self.since = change.seq; + + self.emit('change', change); + + self.check_for_catchup(change.seq) +} + +Feed.prototype.check_for_catchup = function check_for_catchup(seq) { + var self = this + + if (self.is_db_updates) + return + if(self.caught_up) + return + if(seq < self.original_db_seq) + return + + self.caught_up = true + self.emit('catchup', seq) +} + +Feed.prototype.on_inactivity = function on_inactivity() { + var self = this; + var now = new Date; + var elapsed_ms = now - self.change_at; + var elapsed_s = elapsed_ms / 1000; + + // + // Since this is actually not fatal, lets just totally reset and start a new + // request, JUST in case something was bad. + // + self.log.debug('Req ' + self.pending.request.id() + ' made no changes for ' + elapsed_s + 's'); + return self.restart(); + +} + +Feed.prototype.restart = function restart() { + var self = this + + self.emit('restart') + + // Kill ourselves and then start up once again + self.stop() + self.dead = false + self.start() +} + +module.exports = { "Feed" : Feed + }; + + +/* + * Utilities + */ + +function destroy_req(req) { + if(req) + destroy_response(req.response) + + if(req && typeof req.destroy == 'function') + req.destroy() +} + +function destroy_response(response) { + if(!response) + return; + + if(typeof response.abort === 'function') + response.abort(); + + if(response.connection) + response.connection.destroy(); +} diff --git a/lib/follow/lib/index.js b/lib/follow/lib/index.js new file mode 100644 index 00000000..dc20f054 --- /dev/null +++ b/lib/follow/lib/index.js @@ -0,0 +1,51 @@ +// Copyright 2011 Jason Smith, Jarrett Cruger and contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +exports.scrub_creds = function scrub_creds(url) { + return url.replace(/^(https?:\/\/)[^:]+:[^@]+@(.*)$/, '$1$2'); // Scrub username and password +} + +exports.JP = JSON.parse; +exports.JS = JSON.stringify; +exports.JDUP = function(obj) { return JSON.parse(JSON.stringify(obj)) }; + +var timeouts = { 'setTimeout': setTimeout + , 'clearTimeout': clearTimeout + } + +exports.setTimeout = function() { return timeouts.setTimeout.apply(this, arguments) } +exports.clearTimeout = function() { return timeouts.clearTimeout.apply(this, arguments) } +exports.timeouts = function(set, clear) { + timeouts.setTimeout = set + timeouts.clearTimeout = clear +} + +var debug = require('debug') + +function getLogger(name) { + return { "trace": noop + , "debug": debug('follow:' + name + ':debug') + , "info" : debug('follow:' + name + ':info') + , "warn" : debug('follow:' + name + ':warn') + , "error": debug('follow:' + name + ':error') + , "fatal": debug('follow:' + name + ':fatal') + + , "level": {'level':0, 'levelStr':'noop'} + , "setLevel": noop + } +} + +function noop () {} + +exports.log4js = { 'getLogger': getLogger } diff --git a/lib/follow/lib/stream.js b/lib/follow/lib/stream.js new file mode 100644 index 00000000..6027942e --- /dev/null +++ b/lib/follow/lib/stream.js @@ -0,0 +1,312 @@ +// Changes stream +// +// Copyright 2011 Jason Smith, Jarrett Cruger and contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +var lib = require('../lib') + , util = require('util') + , stream = require('stream') + , request = require('request') + +// Use the library timeout functions, primarily so the test suite can catch errors. +var setTimeout = lib.setTimeout + , clearTimeout = lib.clearTimeout + + +var DEFS = + { 'longpoll_header': '{"results":[' + , 'log_level' : process.env.follow_log_level || 'info' + } + +module.exports = { 'Changes': Changes + } + + +util.inherits(Changes, stream) +function Changes (opts) { + var self = this + stream.call(self) + + self.readable = true + self.writable = true + + self.headers = {} + self.statusCode = null + + opts = opts || {} + self.feed = opts.feed || null // "continuous" or "longpoll" + self.encoding = opts.encoding || 'utf8' + + self.log = opts.log + if(!self.log) { + self.log = lib.log4js.getLogger('change_stream') + self.log.setLevel(DEFS.log_level) + } + + self.is_sending = true + self.is_ending = false + self.is_dead = false + + self.source = null + self.expect = null + self.buf = null + self.changes = [] + + self.on('pipe', function(src) { + if(!self.source) + self.source = src + else { + var er = new Error('Already have a pipe source') + er.source = self.source + self.error(er) + } + }) +} + + +Changes.prototype.setHeader = function(key, val) { + var self = this + self.headers[key] = val +} + +// +// Readable stream API +// + +Changes.prototype.setEncoding = function(encoding) { + var self = this + self.encoding = encoding // TODO +} + + +Changes.prototype.pause = function() { + var self = this + self.is_sending = false + + if(self.source && self.source.pause) + self.source.pause() +} + + +Changes.prototype.resume = function() { + var self = this + self.is_sending = true + if(self.source && self.source.resume) + self.source.resume() + self.emit_changes() +} + +// +// Writable stream API +// + +Changes.prototype.write = function(data, encoding) { + var self = this + + data = self.normalize_data(data, encoding) + if(typeof data != 'string') + return // Looks like normalize_data emitted an error. + + if(self.feed === 'longpoll') + return self.write_longpoll(data) + else if(self.feed === 'continuous') + return self.write_continuous(data) +} + + +Changes.prototype.write_longpoll = function(data) { + var self = this + + if(self.buf === null) + self.buf = [] + + self.buf.push(data) + return true +} + + +Changes.prototype.write_continuous = function(data) { + var self = this + + var offset, json, change + , buf = (self.buf || "") + data + + self.log.debug('write: ' + util.inspect({'data':data, 'buf':buf})) + + // Buf could have 0, 1, or many JSON objects in it. + while((offset = buf.indexOf("\n")) >= 0) { + json = buf.substr(0, offset); + buf = buf.substr(offset + 1); + self.log.debug('JSON: ' + util.inspect(json)) + + // Heartbeats (empty strings) are fine, but otherwise confirm valid JSON. + if(json === "") + ; + + else if(json[0] !== '{') + return self.error(new Error('Non-object JSON data: ' + json)) + + else { + try { change = JSON.parse(json) } + catch (er) { return self.error(er) } + + self.log.debug('Object: ' + util.inspect(change)) + json = JSON.stringify(change) + } + + // Change (or heartbeat) looks good. + self.changes.push(json) + } + + // Remember the unused data and send all known good changes (or heartbeats). The data (or heartbeat) + // event listeners may call .pause() so remember the is_sending state now before calling them. + var was_sending = self.is_sending + self.buf = buf + self.emit_changes() + return was_sending +} + + +Changes.prototype.end = function(data, encoding) { + var self = this + + self.is_ending = true + self.writable = false + + // Always call write, even with no data, so it can fire the "end" event. + self.write(data, encoding) + + if(self.feed === 'longpoll') { + var changes = [ DEFS.longpoll_header ].concat(self.buf).join('') + try { changes = JSON.parse(changes) || {} } + catch (er) { return self.error(er) } + + if(!Array.isArray(changes.results)) + return self.error(new Error('No "results" field in feed')) + if(self.changes.length !== 0) + return self.error(new Error('Changes are already queued: ' + JSON.stringify(self.changes))) + + self.changes = changes.results.map(function(change) { return JSON.stringify(change) }) + return self.emit_changes() + } + + else if(self.feed === 'continuous') { + if(self.buf !== "") + self.log.debug('Unprocessed data after "end" called: ' + util.inspect(self.buf)) + } +} + + +Changes.prototype.emit_changes = function() { + var self = this + + while(self.is_sending && self.changes.length > 0) { + var change = self.changes.shift() + if(change === "") { + self.log.debug('emit: heartbeat') + self.emit('heartbeat') + } + + else { + self.log.debug('emit: data') + self.emit('data', change) + } + } + + if(self.is_sending && self.is_ending && self.changes.length === 0) { + self.is_ending = false + self.readable = false + self.log.debug('emit: end') + self.emit('end') + } +} + +// +// Readable/writable stream API +// + +Changes.prototype.destroy = function() { + var self = this + self.log.debug('destroy') + + self.is_dead = true + self.is_ending = false + self.is_sending = false + + if(self.source && typeof self.source.abort == 'function') + return self.source.abort() + + if(self.source && typeof self.source.destroy === 'function') + self.source.destroy() + + // Often the source is from the request package, so destroy its response object. + if(self.source && self.source.__isRequestRequest && self.source.response + && typeof self.source.response.destroy === 'function') + self.source.response.destroy() +} + + +Changes.prototype.destroySoon = function() { + var self = this + throw new Error('not implemented') + //return self.request.destroySoon() +} + +// +// Internal implementation +// + +Changes.prototype.normalize_data = function(data, encoding) { + var self = this + + if(data instanceof Buffer) + data = data.toString(encoding) + else if(typeof data === 'undefined' && typeof encoding === 'undefined') + data = "" + + if(typeof data != 'string') + return self.error(new Error('Not a string or Buffer: ' + util.inspect(data))) + + if(self.feed !== 'continuous' && self.feed !== 'longpoll') + return self.error(new Error('Must set .feed to "continuous" or "longpoll" before writing data')) + + if(self.expect === null) + self.expect = (self.feed == 'longpoll') + ? DEFS.longpoll_header + : "" + + var prefix = data.substr(0, self.expect.length) + data = data.substr(prefix.length) + + var expected_part = self.expect.substr(0, prefix.length) + , expected_remainder = self.expect.substr(expected_part.length) + + if(prefix !== expected_part) + return self.error(new Error('Prefix not expected '+util.inspect(expected_part)+': ' + util.inspect(prefix))) + + self.expect = expected_remainder + return data +} + + +Changes.prototype.error = function(er) { + var self = this + + self.readable = false + self.writable = false + self.emit('error', er) + + // The write() method sometimes returns this value, so if there was an error, make write() return false. + return false +} diff --git a/lib/nano.js b/lib/nano.js index 19f61773..d990d1e1 100644 --- a/lib/nano.js +++ b/lib/nano.js @@ -18,7 +18,7 @@ var querystring = require('querystring'); var request = require('request'); var errs = require('errs'); var _ = require('underscore'); -var follow = require('follow'); +var follow = require('./follow/api.js'); var logger = require('./logger'); var nano; diff --git a/package.json b/package.json index b22e7dd4..7879a2f6 100644 --- a/package.json +++ b/package.json @@ -18,24 +18,26 @@ ], "dependencies": { "request": "^2.76.0", - "follow": "^0.12.1", "errs": "^0.3.2", "underscore": "^1.8.3", "debug": "^2.2.0" }, "devDependencies": { "async": "^2.1.2", - "tape": "^4.6.2", + "endswith": "^0.0.0", "istanbul": "^0.4.5", - "jshint": "^2.9.4", "jscs": "^3.0.7", + "jshint": "^2.9.4", "nock": "^9.0.0", - "endswith": "^0.0.0", + "tap": "~10.3.2", + "tape": "^4.6.2", "tape-it": "^0.3.1" }, "scripts": { - "test": "bash scripts/run_couchdb_on_travis.sh; npm run mocha; bash scripts/stop_couchdb_on_travis.sh", + "test": "npm run coretests; npm run followtests", + "coretests": "bash scripts/run_couchdb_on_travis.sh; npm run mocha; bash scripts/stop_couchdb_on_travis.sh", "mocha": "DEBUG=* NOCK_OFF=true istanbul cover tape tests/*/*/*.js", + "followtests": "bash scripts/run_couchdb16_on_travis.sh; tap follow_tests/*.js; bash scripts/stop_couchdb16_on_travis.sh", "unmocked": "NOCK_OFF=true tape tests/*/*/*.js", "mocked": "tape tests/*/*/*.js", "jshint": "jshint tests/*/*/*.js lib/*.js", diff --git a/scripts/run_couchdb16_on_travis.sh b/scripts/run_couchdb16_on_travis.sh new file mode 100755 index 00000000..0862e1fa --- /dev/null +++ b/scripts/run_couchdb16_on_travis.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +if [ ! -z $TRAVIS ]; then + # Install CouchDB Master + echo "Starting CouchDB 1.6 Docker" + docker run --ulimit nofile=2048:2048 -d -p 5984:5984 --name couchdb16 couchdb + + # wait for couchdb to start + while [ '200' != $(curl -s -o /dev/null -w %{http_code} http://127.0.0.1:5984) ]; do + echo waiting for couch to load... ; + sleep 1; + done +fi \ No newline at end of file diff --git a/scripts/stop_couchdb16_on_travis.sh b/scripts/stop_couchdb16_on_travis.sh new file mode 100755 index 00000000..199497b1 --- /dev/null +++ b/scripts/stop_couchdb16_on_travis.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +if [ ! -z $TRAVIS ]; then + echo "Stopping CouchDB 1.6 Docker" + docker stop $(docker ps -a -q) + docker rm $(docker ps -a -q) +fi \ No newline at end of file