/
level-ws.js
107 lines (85 loc) · 2.26 KB
/
level-ws.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
var Writable = require('readable-stream').Writable
var inherits = require('inherits')
var extend = require('xtend')
var defaultOptions = { type: 'put' }
function WriteStream (db, options) {
if (!(this instanceof WriteStream)) {
return new WriteStream(db, options)
}
Writable.call(this, { objectMode: true })
this._options = extend(defaultOptions, options)
this._db = db
this._buffer = []
this._flushing = false
var self = this
this.on('finish', function () {
self.emit('close')
})
}
inherits(WriteStream, Writable)
WriteStream.prototype._write = function (d, enc, next) {
var self = this
if (self.destroyed) return
if (self._options.maxBufferLength &&
self._buffer.length > self._options.maxBufferLength) {
self.once('_flush', next)
} else {
if (self._buffer.length === 0) {
self._flushing = true
process.nextTick(function () { self._flush() })
}
self._buffer.push(d)
next()
}
}
WriteStream.prototype._flush = function () {
var self = this
var buffer = self._buffer
if (self.destroyed || !buffer) return
self._buffer = []
self._db.batch(buffer.map(function (d) {
return {
type: d.type || self._options.type,
key: d.key,
value: d.value,
keyEncoding: d.keyEncoding || self._options.keyEncoding,
valueEncoding: (d.valueEncoding || d.encoding ||
self._options.valueEncoding)
}
}), cb)
function cb (err) {
self._flushing = false
if (!self.emit('_flush', err) && err) {
// There was no _flush listener.
self.destroy(err)
}
}
}
WriteStream.prototype.toString = function () {
return 'LevelUP.WriteStream'
}
WriteStream.prototype._final = function (cb) {
var self = this
if (this._flushing) {
// Wait for scheduled or in-progress _flush()
this.once('_flush', function (err) {
if (err) return cb(err)
// There could be additional buffered writes
self._final(cb)
})
} else if (this._buffer && this._buffer.length) {
this.once('_flush', cb)
this._flush()
} else {
cb()
}
}
WriteStream.prototype._destroy = function (err, cb) {
this._buffer = null
this.emit('close')
cb(err)
}
WriteStream.prototype.destroySoon = function () {
this.end()
}
module.exports = WriteStream