Skip to content

Commit

Permalink
Add end end callback
Browse files Browse the repository at this point in the history
  • Loading branch information
chriso committed Jun 8, 2014
1 parent d9a72c6 commit 04e72d0
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 23 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ var parser = new NginxParser('$remote_addr - $remote_user [$time_local] '

parser.read(path, function (row) {
console.log(row);
}, function (err) {
if (err) throw err;
console.log('Done!')
});
```

Expand Down
65 changes: 42 additions & 23 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,81 +37,100 @@ var Parser = module.exports = function (format) {
*
* @param {String} path
* @param {Object} options (optional)
* @param {Function} callback
* @param {Function} iterator - called for each line
* @param {Function} callback (optional) - called at the end
*/

Parser.prototype.read = function (path, options, callback) {
Parser.prototype.read = function (path, options, iterator, callback) {
if (typeof options === 'function') {
callback = options;
callback = iterator;
iterator = options;
}
if (!path || path === '-') {
return this.stdin(callback);
return this.stdin(iterator, callback);
} else if (options.tail) {
return this.tail(path, callback);
return this.tail(path, iterator, callback);
}
return this.stream(fs.createReadStream(path), callback);
return this.stream(fs.createReadStream(path), iterator, callback);
};

/**
* Parse a log file and watch it for changes.
*
* @param {String} path
* @param {Function} callback
* @param {Function} iterator - called for each line
* @param {Function} callback (optional) - called at the end
*/

Parser.prototype.tail = function (path, callback) {
Parser.prototype.tail = function (path, iterator, callback) {
var stream = spawn('tail', [ '-F', '-c', '+0', path]).stdout;
return this.stream(stream, callback);
return this.stream(stream, iterator, callback);
};

/**
* Parse a log stream from STDIN.
*
* @param {Function} callback
* @param {Function} iterator - called for each line
* @param {Function} callback (optional) - called at the end
*/

Parser.prototype.stdin = function (callback) {
return this.stream(process.stdin, callback);
Parser.prototype.stdin = function (iterator, callback) {
return this.stream(process.stdin, iterator, callback);
};

/**
* Parse a log stream.
*
* @param {ReadableStream} stream
* @param {Function} callback
* @param {Function} iterator - called for each line
* @param {Function} callback (optional) - called at the end
*/

Parser.prototype.stream = function (stream, callback) {
var self = this, overflow = new Buffer(0);
Parser.prototype.stream = function (stream, iterator, callback) {
var self = this, overflow = new Buffer(0), complete = false;
stream.on('data', function (data) {
var buffer = buffertools.concat(overflow, data), newline = 0;
for (var i = 0, len = buffer.length; i < len; i++) {
if (buffer[i] === 10) {
self.parseLine(buffer.slice(newline, i), callback);
self.parseLine(buffer.slice(newline, i), iterator);
newline = i + 1;
}
}
overflow = buffer.slice(newline);
});
if (callback) {
stream.on('error', function (err) {
if (complete) return;
complete = true;
callback(err);
});
}
stream.on('end', function () {
if (overflow.length) {
self.parseLine(overflow, callback);
self.parseLine(overflow, iterator);
}
if (complete) return;
complete = true;
if (callback) {
callback();
}
});
process.nextTick(function () {
stream.resume();
});
if (stream.resume) {
process.nextTick(function () {
stream.resume();
});
}
return stream;
};

/**
* Parse a log line.
*
* @param {Buffer|String} line
* @param {Function} callback
* @param {Function} iterator
*/

Parser.prototype.parseLine = function (line, callback) {
Parser.prototype.parseLine = function (line, iterator) {
var match = line.toString().match(this.parser);
if (!match) {
return;
Expand Down Expand Up @@ -172,7 +191,7 @@ Parser.prototype.parseLine = function (line, callback) {
Number(ip[3]);
}

callback(row);
iterator(row);
};

/**
Expand Down

0 comments on commit 04e72d0

Please sign in to comment.