-
Notifications
You must be signed in to change notification settings - Fork 0
/
parse.js
97 lines (88 loc) · 2.81 KB
/
parse.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
var util = require('util');
var StringDecoder = require('string_decoder').StringDecoder;
var Transform = require('stream').Transform;
util.inherits(JSONParseStream, Transform);
var debug = require('debug')('jipe.parse');
var _ = require('underscore');
var PAIRS = {
'[': ']',
'{': '}'
};
function JSONParseStream(options) {
if (!(this instanceof JSONParseStream)) {
return new JSONParseStream(options);
}
var conf = _.extend({
skip: 0
}, options);
Transform.call(this, options);
this._writableState.objectMode = false;
this._readableState.objectMode = true;
this._buffer = '';
this._state = 'DONE'; // PARSING, COLLECTING
this._decoder = new StringDecoder('utf8');
this._to_skip = conf.skip;
}
JSONParseStream.prototype._transform = function(chunk, encoding, done) {
debug('entering transform [%s] %s', this._state, this._buffer);
var buffer = this._decoder.write(chunk);
if (this._to_skip > 0) {
buffer = buffer.substr(this._to_skip);
this._to_skip = 0;
}
while (buffer.length > 0 || this._state == 'PARSING') {
debug('state: %s', this._state);
debug('buffer: %s', buffer);
debug('parse buffer: %s', this._buffer);
if (this._state == 'DONE') {
// eat bullshit between objects (usually whitespace) until [ or {
var s0 = buffer.indexOf('{');
var s1 = buffer.indexOf('[');
var start = 0;
if (s0 == -1 && s1 == -1) {
// object doesn't start in this chunk, so just drop it
return done();
} else if ([s0, s1].indexOf(-1) >= 0) {
// only one start symbol was found
start = Math.max(s0, s1);
} else {
// both were found, use the first one
start = Math.min(s0, s1);
}
// get first char in so we know what to look for
this._buffer = buffer.substr(start, 1);
buffer = buffer.substr(start + 1);
this._state = 'COLLECTING';
} else if (this._state == 'COLLECTING') {
// collect until matching pair terminator
var terminator = PAIRS[this._buffer[0]];
debug('searching for %s', terminator);
var index = buffer.indexOf(terminator);
if (index >= 0) {
this._buffer += buffer.substr(0, index + 1);
buffer = buffer.substr(index + 1);
this._state = 'PARSING';
} else {
this._buffer += buffer;
buffer = "";
}
} else if (this._state == 'PARSING') {
try {
var obj = JSON.parse(this._buffer);
try {
this.push(obj);
} catch(e) {
debug('failed to push %j', obj);
}
this._buffer = '';
this._state = 'DONE';
} catch(e) {
debug('didn\'t parse: %s', this._buffer);
this._state = 'COLLECTING';
}
}
}
debug('exiting transform [%s] %s', this._state, this._buffer);
done();
};
module.exports = JSONParseStream;