Browse files

issue #34: streaming with an event emitter

  • Loading branch information...
1 parent 6912b25 commit df2ff4bdeb0f394e1fb03bb2717a0d39d51ca2f4 @fkuo committed Oct 15, 2012
Showing with 203 additions and 121 deletions.
  1. +203 −121 lib/jsontool.js
View
324 lib/jsontool.js
@@ -5,13 +5,14 @@
// See <https://github.com/trentm/json>.
//
-var VERSION = "4.0.1";
+var VERSION = "5.0.1";
var util = require('util');
var pathlib = require('path');
var vm = require('vm');
var fs = require('fs');
var warn = console.warn;
+var EventEmitter = require('events').EventEmitter;
@@ -78,7 +79,7 @@ if (util.format) {
if (typeof f !== 'string') {
var objects = [];
for (var i = 0; i < arguments.length; i++) {
- objects.push(inspect(arguments[i]));
+ objects.push(util.inspect(arguments[i]));
}
return objects.join(' ');
}
@@ -100,7 +101,7 @@ if (util.format) {
if (x === null || typeof x !== 'object') {
str += ' ' + x;
} else {
- str += ' ' + inspect(x);
+ str += ' ' + util.inspect(x);
}
}
return str;
@@ -395,144 +396,181 @@ function parseArgv(argv) {
}
+
/**
- * Get input from either given file paths or stdin.
+ * Streams chunks from given file paths or stdin.
*
* @param opts {Object} Parsed options.
- * @param callback {Function} `function (err, chunk)` where err is an
- * error string if there was a problem. This is called once for each
- * "chunk". XXX START HERE: splain chunks
+ * @returns {Object} An emitter that emits 'chunk', 'error', and 'end'.
+ * emit('chunk', chunk) where chunk is complete block of JSON ready to parse
+ * emit('error', error) when an underlying stream emits an error
+ * emit('end') when all streams are done
*/
-function jsonChunksFromInput(opts, callback) {
- // If returned from `stripHeaders()` and `finishedHeaders` is still false,
- // then we've process a chunk with an incomplete set of headers:
- // `stripHeaders()` should be called again with the next chunk.
- var finishedHeaders = false;
- function stripHeaders(s) {
- // Take off a leading HTTP header if any and pass it through.
- while (true) {
- if (s.slice(0,5) === "HTTP/") {
- var index = s.indexOf('\r\n\r\n');
- var sepLen = 4;
- if (index == -1) {
- index = s.indexOf('\n\n');
- sepLen = 2;
- }
- if (index != -1) {
- if (! opts.dropHeaders) {
- emit(s.slice(0, index+sepLen));
- }
- var is100Continue = (s.slice(0, 21) === "HTTP/1.1 100 Continue");
- s = s.slice(index+sepLen);
- if (is100Continue) {
- continue;
+function chunkEmitter(opts) {
+ var emitter = new EventEmitter();
+ var streaming = true;
+ var chunks = [];
+ var leftover = '';
+ var splitter = /(})(\s*\n\s*)?({\s*")/;
+ function stripHeaders (stream) {
+ var headerLeftover = '';
+ stream.on('data', function onBlock(block) {
+ var s = headerLeftover + block;
+ // Take off a leading HTTP header if any and pass it through.
+ while (true) {
+ if (s.slice(0,5) === "HTTP/") {
+ var index = s.indexOf('\r\n\r\n');
+ var sepLen = 4;
+ if (index == -1) {
+ index = s.indexOf('\n\n');
+ sepLen = 2;
+ }
+ if (index != -1) {
+ if (! opts.dropHeaders) {
+ emit(s.slice(0, index+sepLen));
+ }
+ var is100Continue = (s.slice(0, 21) === "HTTP/1.1 100 Continue");
+ s = s.slice(index+sepLen);
+ if (is100Continue) {
+ continue;
+ }
+ finishedHeaders = true;
+ }
+ } else {
+ finishedHeaders = true;
}
- finishedHeaders = true;
+ headerLeftover = s;
+ break;
}
- } else {
- finishedHeaders = true;
+ if (finishedHeaders) {
+ stream.removeListener('on', onBlock);
+ return headerLeftover;
}
- break;
- }
- //console.warn("XXX stripHeaders done, finishedHeaders=%s", finishedHeaders)
- return s;
+ });
}
-
- if (opts.inputFiles.length > 0) {
- // Read input files.
- // TODO: Improve streaming here: read files async in chunks and stream
- // as above if `-ga`.
- var i = 0;
- var chunks = [];
- try {
- var first = fs.readFileSync(opts.inputFiles[i], 'utf8');
- first = stripHeaders(first);
- callback(null, first);
- for (i++; i < opts.inputFiles.length; i++) {
- callback(null, fs.readFileSync(opts.inputFiles[i], 'utf8'));
- }
- } catch (e) {
- return callback(
- format('could not read "%s": %s', opts.inputFiles[i], e));
+ function emitChunks (block, emitter) {
+ if (block[0] !== '{') { // Only support streaming consecutive *objects*.
+ streaming = false;
+ chunks.push(block);
+ return '';
}
- } else if (opts.group && opts.array && opts.outputMode !== OM_JSON) {
- // Streaming from stdin.
- //console.warn("XXX streaming");
- var streaming = true;
- var leftover = '';
- var chunks = [];
- var splitter = /(})(\s*\n\s*)?({\s*")/;
- function callbackJsonChunks(chunk) {
- if (chunk[0] !== '{') { // Only support streaming consecutive *objects*.
- streaming = false;
- chunks.push(chunk);
- return;
- }
- /* Example:
- * > '{"a":"b"}\n{"a":"b"}\n{"a":"b"}'.split(/(})(\s*\n\s*)?({\s*")/)
- * [ '{"a":"b"',
- * '}',
- * '\n',
- * '{"',
- * 'a":"b"',
- * '}',
- * '\n',
- * '{"',
- * 'a":"b"}' ]
- */
- var bits = chunk.split(splitter);
- //console.warn("XXX bits: ", bits)
- if (bits.length === 1) {
- leftover = chunk;
- } else {
- var n = bits.length - 2;
- callback(null, bits[0] + bits[1]);
- for (var i = 3; i < n; i += 4) {
- callback(null, bits[i] + bits[i+1] + bits[i+2]);
- }
- leftover = bits[n] + bits[n+1];
+ /* Example:
+ * > '{"a":"b"}\n{"a":"b"}\n{"a":"b"}'.split(/(})(\s*\n\s*)?({\s*")/)
+ * [ '{"a":"b"',
+ * '}',
+ * '\n',
+ * '{"',
+ * 'a":"b"',
+ * '}',
+ * '\n',
+ * '{"',
+ * 'a":"b"}' ]
+ */
+ var bits = block.split(splitter);
+ //console.warn("XXX bits: ", bits)
+ if (bits.length === 1) {
+ return block;
+ } else {
+ var n = bits.length - 2;
+ emitter.emit('chunk', bits[0] + bits[1]);
+ for (var i = 3; i < n; i += 4) {
+ emitter.emit('chunk', bits[i] + bits[i+1] + bits[i+2]);
}
+ return bits[n] + bits[n+1];
}
-
- var stdin = process.openStdin();
- stdin.setEncoding('utf8');
- stdin.on('data', function (chunk) {
- //console.warn("XXX process chunk: %s", JSON.stringify(chunk))
+ }
+ function addListeners(stream) {
+ stream.on('data', function (chunk) {
if (!streaming) {
chunks.push(chunk);
return;
}
var s = leftover + chunk;
- if (!finishedHeaders) {
- s = stripHeaders(s);
- }
- if (!finishedHeaders) {
- leftover = s;
- } else {
- callbackJsonChunks(s);
- }
+ leftover = emitChunks(s, emitter);
});
- stdin.on('end', function () {
+ stream.on('end', function () {
if (!streaming) {
- callback(null, chunks.join(''));
+ emitter.emit('chunk', chunks.join(''));
} else if (leftover) {
- callbackJsonChunks(leftover);
- callback(null, leftover);
+ leftover = emitChunks(leftover, emitter);
+ emitter.emit('chunk', leftover);
}
});
+ }
+ if (opts.inputFiles.length > 0) {
+ // Stream each file in order. Strips headers from the first file.
+ var i = 0;
+ function addEndListener (file) {
+ file.on('end', function () {
+ if (i < opts.inputFiles.length) {
+ var next = opts.inputFiles[++i];
+ var nextFile = fs.createReadStream(next, {encoding: 'utf8'});
+ addListeners(nextFile);
+ addEndListener(nextFile);
+ } else {
+ emitter.emit('end');
+ }
+ });
+ }
+ try {
+ var first = fs.createReadStream(opts.inputFiles[i], {encoding: 'utf8'});
+ leftover = stripHeaders(first) || '';
+ addEndListener(first);
+ addListeners(first);
+ } catch (e) {
+ emitter.emit(
+ 'error',
+ format('could not read "%s": %s', opts.inputFiles[i], e)
+ );
+ }
} else {
- // Read stdin in one big chunk.
+ // Streaming from stdin.
var stdin = process.openStdin();
stdin.setEncoding('utf8');
+ leftover = stripHeaders(stdin) || '';
+ console.warn("XXX 525 leftover: " + leftover);
+ stdin.on('end', function () {
+ emitter.emit('end');
+ });
+ addListeners(stdin);
+ }
+ return emitter;
+}
+
+/**
+ * Get input from either given file paths or stdin.
+ *
+ * @param opts {Object} Parsed options.
+ * @param callback {Function} `function (err, callback)` where err is an
+ * error string if there was a problem.
+ */
+function getInput(opts, callback) {
+ if (opts.inputFiles.length === 0) {
+ // Read from stdin.
var chunks = [];
+
+ var stdin = process.openStdin();
+ stdin.setEncoding('utf8');
stdin.on('data', function (chunk) {
chunks.push(chunk);
});
+
stdin.on('end', function () {
- var chunk = chunks.join('');
- chunk = stripHeaders(chunk);
callback(null, chunks.join(''));
});
+ } else {
+ // Read input files.
+ var i = 0;
+ var chunks = [];
+ try {
+ for (; i < opts.inputFiles.length; i++) {
+ chunks.push(fs.readFileSync(opts.inputFiles[i], 'utf8'));
+ }
+ } catch (e) {
+ return callback(
+ format('could not read "%s": %s', opts.inputFiles[i], e));
+ }
+ callback(null, chunks.join(''));
}
}
@@ -815,7 +853,7 @@ function printDatum(datum, opts, sep, alwaysPrintSep) {
var output = null;
switch (opts.outputMode) {
case OM_INSPECT:
- output = util.inspect(datum, false, Infinity, true);
+ output = util.inspect(datum, false, Infinity, process.stdout.isTTY);
break;
case OM_JSON:
if (typeof datum !== 'undefined') {
@@ -925,14 +963,48 @@ function main(argv) {
}
var lookupStrs = opts.args;
- jsonChunksFromInput(opts, function (err, chunk) {
- //console.warn("XXX chunk: '%s'", chunk)
- if (err) {
- warn("json: error: %s", err)
- return drainStdoutAndExit(1);
- }
-
- // Expect the remainder to be JSON.
+ if (opts.group && opts.array && opts.outputMode !== OM_JSON) {
+ // streaming
+ var chunker = chunkEmitter(opts);
+ chunker.on('error', function(error) {
+ warn("json: error: %s", err)
+ return drainStdoutAndExit(1);
+ });
+ chunker.on('chunk', parseChunk);
+ } else {
+ // not streaming
+ getInput(opts, function (err, buffer) {
+ if (err) {
+ warn("json: error: %s", err)
+ return drainStdoutAndExit(1);
+ }
+ // Take off a leading HTTP header if any and pass it through.
+ while (true) {
+ if (buffer.slice(0,5) === "HTTP/") {
+ var index = buffer.indexOf('\r\n\r\n');
+ var sepLen = 4;
+ if (index == -1) {
+ index = buffer.indexOf('\n\n');
+ sepLen = 2;
+ }
+ if (index != -1) {
+ if (! opts.dropHeaders) {
+ emit(buffer.slice(0, index+sepLen));
+ }
+ var is100Continue = (buffer.slice(0, 21) === "HTTP/1.1 100 Continue");
+ buffer = buffer.slice(index+sepLen);
+ if (is100Continue) {
+ continue;
+ }
+ }
+ }
+ break;
+ }
+ parseChunk(buffer);
+ });
+ }
+ function parseChunk(chunk) {
+ // Expect the chunk to be JSON.
if (! chunk.length) {
return;
}
@@ -1111,7 +1183,17 @@ function main(argv) {
// Output
if (opts.outputMode === OM_JSON) {
- if (lookupsAreIndeces) {
+ if (lookups.length === 1 && !opts.array) {
+ // Special case: For JSON output of a *single* lookup, *don't* use
+ // the full table structure, else there is no way to get string
+ // quoting for a single value:
+ // $ echo '{"a": [], "b": "[]"}' | json -j a
+ // []
+ // $ echo '{"a": [], "b": "[]"}' | json -j b
+ // "[]"
+ // See <https://github.com/trentm/json/issues/35> for why.
+ data = data[lookups[0].join('.')];
+ } else if (lookupsAreIndeces) {
// Special case: Lookups that are all indeces into an input array
// are more likely to be wanted as an array of selected items rather
// than a "JSON table" thing that we use otherwise.
@@ -1151,7 +1233,7 @@ function main(argv) {
// Output `data` as is.
printDatum(data, opts, '\n', false);
}
- });
+ }
}
if (require.main === module) {

0 comments on commit df2ff4b

Please sign in to comment.