diff --git a/lib/read-stream-state.js b/lib/read-stream-state.js new file mode 100644 index 00000000..aebcd518 --- /dev/null +++ b/lib/read-stream-state.js @@ -0,0 +1,58 @@ +function State () { + this.ended = this._ready = this._reading = this._destroyed = this._paused = false +} + +State.prototype.end = function() { + this.ended = true + this._destroyed = false +} + +State.prototype.ready = function() { + this._ready = true +} + +State.prototype.destroy = function() { + this._destroyed = true +} + +State.prototype.pause = function() { + this._paused = true +} + +State.prototype.resume = function() { + this._paused = false +} + +State.prototype.read = function() { + this._reading = true +} + +State.prototype.endRead = function() { + this._reading = false +} + +State.prototype.canPause = function() { + return !this.ended && !this._paused +} + +State.prototype.canResume = function() { + return !this.ended && this._paused +} + +State.prototype.canRead = function() { + return !this.ended && !this._reading && !this._paused +} + +State.prototype.canCleanup = function() { + return !this.ended && !this._reading +} + +State.prototype.canEmitData = function() { + return !this.ended && !this._destroyed +} + +State.prototype.canEnd = function() { + return !this.ended +} + +module.exports = function () { return new State() } \ No newline at end of file diff --git a/lib/read-stream.js b/lib/read-stream.js index d937f018..8337a5f2 100644 --- a/lib/read-stream.js +++ b/lib/read-stream.js @@ -7,6 +7,7 @@ var Stream = require('stream').Stream , bufferStream = require('simple-bufferstream') , inherits = require('util').inherits , extend = require('util')._extend + , State = require('./read-stream-state') , toEncoding = require('./util').toEncoding , toSlice = require('./util').toSlice @@ -28,11 +29,11 @@ var Stream = require('stream').Stream } , makeNoData = function () { return null } - function ReadStream (options, db, iteratorFactory) { Stream.call(this) - this._status = 'ready' + this._state = State() + this._dataEvent = 'data' this.readable = true this.writable = false @@ -59,9 +60,10 @@ function ReadStream (options, db, iteratorFactory) { var ready = function () { - if (this._status == 'ended') + if (!this._state.canEmitData()) return + this._state.ready() this._iterator = iteratorFactory(this._options) this.emit('ready') this._read() @@ -76,21 +78,22 @@ function ReadStream (options, db, iteratorFactory) { inherits(ReadStream, Stream) ReadStream.prototype.destroy = function () { - this._status = 'destroyed' - this._cleanup() + this._state.destroy() + if (this._state.canCleanup()) + this._cleanup() } ReadStream.prototype.pause = function () { - if (this._status != 'ended' && !/\+paused$/.test(this._status)) { + if (this._state.canPause()) { + this._state.pause() this.emit('pause') - this._status += '+paused' // preserve existing status } } ReadStream.prototype.resume = function () { - if (this._status != 'ended') { + if (this._state.canResume()) { this.emit('resume') - this._status = this._status.replace(/\+paused$/, '') + this._state.resume() this._read() } } @@ -115,44 +118,39 @@ ReadStream.prototype.pipe = function (dest) { } ReadStream.prototype._read = function () { - if (this._status == 'ready') { - this._status = 'reading' + if (this._state.canRead()) { + this._state.read() this._iterator.next(this._onData.bind(this)) } } ReadStream.prototype._onData = function (err, key, value) { - if (err) + this._state.endRead() + if (err || !arguments.length /* end */ || !this._state.canEmitData()) return this._cleanup(err) - if (!arguments.length) // end - return this._cleanup() - if (this._status == 'ended') - return - if (/^reading/.test(this._status)) - this._status = this._status.replace(/^reading/, 'ready') - this._read() + this._read() // queue another read even tho we may not need it this.emit(this._dataEvent, this._makeData(key, value)) } ReadStream.prototype._cleanup = function (err) { - if (this._status == 'ended') - return err && this.emit('error', err) + if (err) + this.emit('error', err) + + if (!this._state.canEnd()) + return - var s = this._status - this._status = 'ended' + this._state.end() this.readable = false if (this._iterator) { this._iterator.end(function () { + this._iterator = null this.emit('close') }.bind(this)) } else this.emit('close') - if (err) - this.emit('error', err) - else (s != 'destroyed') - this.emit('end') + this.emit('end') } ReadStream.prototype.toString = function () { diff --git a/test/read-stream-test.js b/test/read-stream-test.js index 1c90f780..f491992f 100644 --- a/test/read-stream-test.js +++ b/test/read-stream-test.js @@ -655,4 +655,20 @@ buster.testCase('ReadStream', { }.bind(this)) }.bind(this)) } + + , 'test can only end once': function (done) { + this.openTestDatabase(function (db) { + db.batch(this.sourceData.slice(), function (err) { + refute(err) + + var rs = db.createReadStream() + .on('close', done) + + process.nextTick(function () { + rs.destroy() + }) + + }.bind(this)) + }.bind(this)) + } }) \ No newline at end of file