-
Notifications
You must be signed in to change notification settings - Fork 2
/
stream.js
174 lines (157 loc) · 4.97 KB
/
stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
var ltgt = require('ltgt')
var frame = require('./frame')
module.exports = Stream
function Stream (blocks, opts) {
opts = opts || {}
this.reverse = !!opts.reverse
this.live = !!opts.live
this.blocks = blocks
this.cursor = -1 //this.start = this.end = -1
this.seqs = opts.seqs !== false
this.values = opts.values !== false
this.limit = opts.limit || 0
this.count = 0
this.min = this.max = this.min_inclusive = this.max_inclusive = null
var self = this
this.opts = opts
this.blocks.onReady(this._ready.bind(this))
}
Stream.prototype._ready = function () {
this.min = ltgt.lowerBound(this.opts, null)
if(ltgt.lowerBoundInclusive(this.opts))
this.min_inclusive = this.min
this.max = ltgt.upperBound(this.opts, null)
if(ltgt.upperBoundInclusive(this.opts))
this.max_inclusive = this.max
//note: cursor has default of the current length or zero.
if(this.reverse)
this.cursor = ltgt.upperBound(this.opts, this.blocks.length)
else
this.cursor = ltgt.lowerBound(this.opts, 0)
if(this.cursor < 0) this.cursor = 0
if(!this.live && this.cursor === 0 && this.blocks.length == 0) {
this.ended = true
return this.resume()
}
var self = this
this.blocks.getBlock(~~(this.cursor/self.blocks.block), function (err, buffer) {
self._buffer = buffer
//reversing cursor starts at length, which won't be a thing.
self.resume()
})
}
Stream.prototype._next = function () {
if(!this._buffer || this.cursor === -1 || this.isAtEnd()) {
this._at_end = true
return
}
var block = this.blocks.block
var next_block
if(!this.reverse) {
var result = frame.getRecord(block, this._buffer, this.cursor)
if(result) {
this.cursor += result.length + 4
return result
} else {
//move to start of next block
this.cursor = (this.cursor - (this.cursor%block))+block
if(this.cursor < this.blocks.length) {
//sometimes this is sync, which means we can actually return instead of cb
//if we always cb, we can get two resume loops going, which is weird.
next_block = ~~(this.cursor/block)
}
else
return
}
}
else {
if(this.cursor % block) {
//get the previous record, unless this is the first item
//in a lte stream.
if(!(this.count === 0 && this.max_inclusive === this.cursor)) {
this.cursor = frame.getPreviousRecord(block, this._buffer, this.cursor)
}
return frame.getRecord(block, this._buffer, this.cursor)
}
else {
var current_block = ~~(this.cursor/block)
next_block = ~~(this.cursor/block)-1
if(current_block === next_block)
throw new Error('failed to decrement block')
}
}
var self = this, async = false, returned = false
if(next_block >= 0) {
this.blocks.getBlock(next_block, function (err, buffer) {
if(err) return console.error(err)
//if(err) return self.abort(err)
self._buffer = buffer
returned = true
if(self.reverse) {
//point to the end of the blocks, in the newly retrived block
self.cursor = next_block*block + buffer.readUInt32LE(block - 4)
}
if(async) self.resume()
})
async = true
}
if(returned) return self._next()
}
Stream.prototype.isAtEnd = function () {
return this.reverse ? this.cursor <= 0 : this.cursor >= this.blocks.length
}
Stream.prototype._format = function (result) {
if(this.values) {
var data = this._buffer.slice(result.start, result.start + result.length)
if (!data.every(x => x === 0)) // skip deleted
{
var value = this.blocks.codec.decode(data)
if(this.seqs) this.sink.write({seq: result.offset, value: value})
else this.sink.write(value)
}
}
else
this.sink.write(result.offset)
}
Stream.prototype.resume = function () {
if(!this.sink || this.sink.paused) return
this._at_end = false
if(this.ended && !this.sink.ended) {
return this.sink.end(this.ended === true ? null : this.ended)
}
while(this.sink && !this.sink.paused/* && !this.ended*/) {
var result = this._next()
if(result && result.length) {
var o = result.offset
this.count++
if(
(this.min === null || this.min < o || this.min_inclusive === o) &&
(this.max === null || this.max > o || this.max_inclusive === o)
) {
this._format(result)
}
else {
if(this.limit > 0 && this.count >= this.limit) {
this.abort(); this.sink.end()
}
}
}
else if(!this.live && (result ? result.length == 0 : this.isAtEnd())) {
if(this.ended) throw new Error('already ended')
this.abort()
return
}
else
return
}
}
Stream.prototype.abort = function (err) {
//only thing to do is unsubscribe from live stream.
//but append isn't implemented yet...
this.ended = err || true
var i = this.blocks.streams.indexOf(this)
if(~i) this.blocks.streams.splice(i, 1)
if(!this.sink.ended)
this.sink.end(err === true ? null : err)
}
Stream.prototype.pipe = require('push-stream/pipe')