Skip to content
This repository was archived by the owner on Dec 1, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions lib/read-stream-state.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
function State () {
this.ended = this._ready = this._reading = this._destroyed = this._paused = false
}

State.prototype.end = function() {
this.ended = true
this._destroyed = false
}

State.prototype.ready = function() {
this._ready = true
}

State.prototype.destroy = function() {
this._destroyed = true
}

State.prototype.pause = function() {
this._paused = true
}

State.prototype.resume = function() {
this._paused = false
}

State.prototype.read = function() {
this._reading = true
}

State.prototype.endRead = function() {
this._reading = false
}

State.prototype.canPause = function() {
return !this.ended && !this._paused
}

State.prototype.canResume = function() {
return !this.ended && this._paused
}

State.prototype.canRead = function() {
return !this.ended && !this._reading && !this._paused
}

State.prototype.canCleanup = function() {
return !this.ended && !this._reading
}

State.prototype.canEmitData = function() {
return !this.ended && !this._destroyed
}

State.prototype.canEnd = function() {
return !this.ended
}

module.exports = function () { return new State() }
52 changes: 25 additions & 27 deletions lib/read-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var Stream = require('stream').Stream
, bufferStream = require('simple-bufferstream')
, inherits = require('util').inherits
, extend = require('util')._extend
, State = require('./read-stream-state')

, toEncoding = require('./util').toEncoding
, toSlice = require('./util').toSlice
Expand All @@ -28,11 +29,11 @@ var Stream = require('stream').Stream
}
, makeNoData = function () { return null }


function ReadStream (options, db, iteratorFactory) {
Stream.call(this)

this._status = 'ready'
this._state = State()

this._dataEvent = 'data'
this.readable = true
this.writable = false
Expand All @@ -59,9 +60,10 @@ function ReadStream (options, db, iteratorFactory) {


var ready = function () {
if (this._status == 'ended')
if (!this._state.canEmitData())
return

this._state.ready()
this._iterator = iteratorFactory(this._options)
this.emit('ready')
this._read()
Expand All @@ -76,21 +78,22 @@ function ReadStream (options, db, iteratorFactory) {
inherits(ReadStream, Stream)

ReadStream.prototype.destroy = function () {
this._status = 'destroyed'
this._cleanup()
this._state.destroy()
if (this._state.canCleanup())
this._cleanup()
}

ReadStream.prototype.pause = function () {
if (this._status != 'ended' && !/\+paused$/.test(this._status)) {
if (this._state.canPause()) {
this._state.pause()
this.emit('pause')
this._status += '+paused' // preserve existing status
}
}

ReadStream.prototype.resume = function () {
if (this._status != 'ended') {
if (this._state.canResume()) {
this.emit('resume')
this._status = this._status.replace(/\+paused$/, '')
this._state.resume()
this._read()
}
}
Expand All @@ -115,44 +118,39 @@ ReadStream.prototype.pipe = function (dest) {
}

ReadStream.prototype._read = function () {
if (this._status == 'ready') {
this._status = 'reading'
if (this._state.canRead()) {
this._state.read()
this._iterator.next(this._onData.bind(this))
}
}

ReadStream.prototype._onData = function (err, key, value) {
if (err)
this._state.endRead()
if (err || !arguments.length /* end */ || !this._state.canEmitData())
return this._cleanup(err)
if (!arguments.length) // end
return this._cleanup()
if (this._status == 'ended')
return
if (/^reading/.test(this._status))
this._status = this._status.replace(/^reading/, 'ready')
this._read()
this._read() // queue another read even tho we may not need it
this.emit(this._dataEvent, this._makeData(key, value))
}

ReadStream.prototype._cleanup = function (err) {
if (this._status == 'ended')
return err && this.emit('error', err)
if (err)
this.emit('error', err)

if (!this._state.canEnd())
return

var s = this._status
this._status = 'ended'
this._state.end()
this.readable = false

if (this._iterator) {
this._iterator.end(function () {
this._iterator = null
this.emit('close')
}.bind(this))
} else
this.emit('close')

if (err)
this.emit('error', err)
else (s != 'destroyed')
this.emit('end')
this.emit('end')
}

ReadStream.prototype.toString = function () {
Expand Down
16 changes: 16 additions & 0 deletions test/read-stream-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -655,4 +655,20 @@ buster.testCase('ReadStream', {
}.bind(this))
}.bind(this))
}

, 'test can only end once': function (done) {
this.openTestDatabase(function (db) {
db.batch(this.sourceData.slice(), function (err) {
refute(err)

var rs = db.createReadStream()
.on('close', done)

process.nextTick(function () {
rs.destroy()
})

}.bind(this))
}.bind(this))
}
})