Skip to content

Commit

Permalink
Breaking: use classes, requiring new
Browse files Browse the repository at this point in the history
  • Loading branch information
vweevers committed Jul 1, 2022
1 parent 817c30c commit dafafb2
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 90 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -22,7 +22,7 @@ const { Level } = require('level')
const WriteStream = require('level-ws')

const db = new Level('./db', { valueEncoding: 'json' })
const ws = WriteStream(db)
const ws = new WriteStream(db)

ws.on('close', function () {
console.log('Done!')
Expand All @@ -40,7 +40,7 @@ ws.end()

## API

### `ws = WriteStream(db[, options])`
### `ws = new WriteStream(db[, options])`

Create a [writable stream](https://nodejs.org/dist/latest-v8.x/docs/api/stream.html#stream_class_stream_writable) that operates in object mode, accepting batch operations to be committed with `db.batch()` on each tick of the Node.js event loop. The optional `options` argument may contain:

Expand Down
130 changes: 62 additions & 68 deletions level-ws.js
@@ -1,90 +1,84 @@
'use strict'

const Writable = require('readable-stream').Writable
const inherits = require('inherits')

const defaultOptions = { type: 'put' }
class WriteStream extends Writable {
constructor (db, options) {
options = Object.assign({ type: 'put' }, options)

function WriteStream (db, options) {
if (!(this instanceof WriteStream)) {
return new WriteStream(db, options)
}

options = Object.assign({}, defaultOptions, options)

Writable.call(this, {
objectMode: true,
highWaterMark: options.highWaterMark || 16
})
super({
objectMode: true,
highWaterMark: options.highWaterMark || 16
})

this._options = options
this._db = db
this._buffer = []
this._flushing = false
this._maxBufferLength = options.maxBufferLength || Infinity
this._options = options
this._db = db
this._buffer = []
this._flushing = false
this._maxBufferLength = options.maxBufferLength || Infinity
this._flush = this._flush.bind(this)

this.on('finish', () => {
this.emit('close')
})
}
this.on('finish', () => {
this.emit('close')
})
}

inherits(WriteStream, Writable)
_write (data, enc, next) {
if (this.destroyed) return

WriteStream.prototype._write = function (data, enc, next) {
if (this.destroyed) return
if (!this._flushing) {
this._flushing = true
process.nextTick(this._flush)
}

if (!this._flushing) {
this._flushing = true
process.nextTick(() => { this._flush() })
if (this._buffer.length >= this._maxBufferLength) {
this.once('_flush', (err) => {
if (err) return this.destroy(err)
this._write(data, enc, next)
})
} else {
this._buffer.push(Object.assign({ type: this._options.type }, data))
next()
}
}

if (this._buffer.length >= this._maxBufferLength) {
this.once('_flush', (err) => {
if (err) return this.destroy(err)
this._write(data, enc, next)
})
} else {
this._buffer.push(Object.assign({ type: this._options.type }, data))
next()
}
}
_flush () {
const buffer = this._buffer

WriteStream.prototype._flush = function () {
const buffer = this._buffer
if (this.destroyed) return

if (this.destroyed) return
this._buffer = []
this._db.batch(buffer, (err) => {
this._flushing = false

this._buffer = []
this._db.batch(buffer, (err) => {
this._flushing = false
if (!this.emit('_flush', err) && err) {
// There was no _flush listener.
this.destroy(err)
}
})
}

if (!this.emit('_flush', err) && err) {
// There was no _flush listener.
this.destroy(err)
_final (cb) {
if (this._flushing) {
// Wait for scheduled or in-progress _flush()
this.once('_flush', (err) => {
if (err) return cb(err)

// There could be additional buffered writes
this._final(cb)
})
} else if (this._buffer && this._buffer.length) {
this.once('_flush', cb)
this._flush()
} else {
cb()
}
})
}

WriteStream.prototype._final = function (cb) {
if (this._flushing) {
// Wait for scheduled or in-progress _flush()
this.once('_flush', (err) => {
if (err) return cb(err)

// There could be additional buffered writes
this._final(cb)
})
} else if (this._buffer && this._buffer.length) {
this.once('_flush', cb)
this._flush()
} else {
cb()
}
}

WriteStream.prototype._destroy = function (err, cb) {
this._buffer = null
cb(err)
_destroy (err, cb) {
this._buffer = null
cb(err)
}
}

module.exports = WriteStream
1 change: 0 additions & 1 deletion package.json
Expand Up @@ -14,7 +14,6 @@
"UPGRADING.md"
],
"dependencies": {
"inherits": "^2.0.3",
"readable-stream": "^3.1.0"
},
"devDependencies": {
Expand Down
38 changes: 19 additions & 19 deletions test.js
Expand Up @@ -77,7 +77,7 @@ function test (label, options, fn) {
// TODO: test various encodings

test('test simple WriteStream', function (t, ctx, done) {
const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
Expand All @@ -89,7 +89,7 @@ test('test simple WriteStream', function (t, ctx, done) {
})

test('test WriteStream with async writes', function (t, ctx, done) {
const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
const sourceData = ctx.sourceData
let i = -1

Expand Down Expand Up @@ -121,7 +121,7 @@ test('race condition between batch callback and close event', function (t, ctx,
// Delaying the batch should not be a problem
slowdown(ctx.db)

const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
let i = 0

ws.on('error', function (err) {
Expand All @@ -141,7 +141,7 @@ test('race condition between batch callback and close event', function (t, ctx,
test('race condition between two flushes', function (t, ctx, done) {
slowdown(ctx.db)

const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
const order = monitor(ws)

ws.on('close', function () {
Expand All @@ -166,7 +166,7 @@ test('race condition between two flushes', function (t, ctx, done) {
})

test('test end accepts data', function (t, ctx, done) {
const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
let i = 0

ws.on('error', function (err) {
Expand All @@ -184,7 +184,7 @@ test('test end accepts data', function (t, ctx, done) {
})

test('test destroy()', function (t, ctx, done) {
const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)

const verify = function () {
ctx.db.iterator().all(function (err, result) {
Expand All @@ -205,7 +205,7 @@ test('test destroy()', function (t, ctx, done) {
})

test('test destroy(err)', function (t, ctx, done) {
const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
const order = monitor(ws)

ws.on('error', function (err) {
Expand Down Expand Up @@ -242,7 +242,7 @@ test('test json encoding', { keyEncoding: 'utf8', valueEncoding: 'json' }, funct
{ key: 'cc', value: { c: 'w00t', d: { e: [0, 10, 20, 30], f: 1, g: 'wow' } } }
]

const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
Expand All @@ -267,7 +267,7 @@ test('test del capabilities for each key/value', { keyEncoding: 'utf8', valueEnc
]

function del () {
const delStream = WriteStream(ctx.db)
const delStream = new WriteStream(ctx.db)
delStream.on('error', function (err) {
t.notOk(err, 'no error')
})
Expand All @@ -290,7 +290,7 @@ test('test del capabilities for each key/value', { keyEncoding: 'utf8', valueEnc
})
}

const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
Expand All @@ -317,7 +317,7 @@ test('test del capabilities as constructor option', { keyEncoding: 'utf8', value
]

function del () {
const delStream = WriteStream(ctx.db, { type: 'del' })
const delStream = new WriteStream(ctx.db, { type: 'del' })
delStream.on('error', function (err) {
t.notOk(err, 'no error')
})
Expand All @@ -339,7 +339,7 @@ test('test del capabilities as constructor option', { keyEncoding: 'utf8', value
})
}

const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
Expand Down Expand Up @@ -369,7 +369,7 @@ test('test type at key/value level must take precedence on the constructor', { k
exception.type = 'put'

function del () {
const delStream = WriteStream(ctx.db, { type: 'del' })
const delStream = new WriteStream(ctx.db, { type: 'del' })
delStream.on('error', function (err) {
t.notOk(err, 'no error')
})
Expand All @@ -392,7 +392,7 @@ test('test type at key/value level must take precedence on the constructor', { k
})
}

const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
Expand All @@ -419,7 +419,7 @@ test('test that missing type errors', function (t, ctx, done) {
})
}

const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
ws.on('error', function (err) {
t.equal(err.message, 'A batch operation must have a type property that is \'put\' or \'del\'', 'should error')
errored = true
Expand All @@ -432,7 +432,7 @@ test('test that missing type errors', function (t, ctx, done) {
})

test('test limbo batch error', function (t, ctx, done) {
const ws = WriteStream(ctx.db)
const ws = new WriteStream(ctx.db)
const order = monitor(ws)

monkeyBatch(ctx.db, function (original, ops, options, cb) {
Expand All @@ -454,7 +454,7 @@ test('test limbo batch error', function (t, ctx, done) {
})

test('test batch error when buffer is full', function (t, ctx, done) {
const ws = WriteStream(ctx.db, { maxBufferLength: 1 })
const ws = new WriteStream(ctx.db, { maxBufferLength: 1 })
const order = monitor(ws)

monkeyBatch(ctx.db, function (original, ops, options, cb) {
Expand All @@ -477,7 +477,7 @@ test('test batch error when buffer is full', function (t, ctx, done) {
})

test('test destroy while waiting to drain', function (t, ctx, done) {
const ws = WriteStream(ctx.db, { maxBufferLength: 1 })
const ws = new WriteStream(ctx.db, { maxBufferLength: 1 })
const order = monitor(ws)

ws.on('error', function (err) {
Expand Down Expand Up @@ -506,7 +506,7 @@ test('test destroy while waiting to drain', function (t, ctx, done) {

function testMaxBuffer (max, randomize) {
return function (t, ctx, done) {
const ws = WriteStream(ctx.db, { maxBufferLength: max })
const ws = new WriteStream(ctx.db, { maxBufferLength: max })
const sourceData = []
const batches = []

Expand Down

0 comments on commit dafafb2

Please sign in to comment.