Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add udp transport, and add real life test with logrotate, upgrade log…

…4node
  • Loading branch information...
commit 332ebfd60803ac56c23ba9afe9b191fca70f0b04 1 parent 9d7969b
Bertrand Paquet authored
View
4 bin/node-logstash-agent
@@ -33,10 +33,12 @@ process.on('uncaughtException', function (err) {
});
if (argv.log_file) {
- logger.reconfigure(argv.log_level, argv.log_file);
+ logger.info('Log to file', argv.log_file, ', log_level', argv.log_level);
+ logger.reconfigure({log_level: argv.log_level, file: argv.log_file});
}
else {
if (argv.log_level) {
+ logger.info('Changing log_level', argv.log_level);
logger.setLogLevel(argv.log_level);
}
}
View
54 lib/inputs/input_udp.js
@@ -0,0 +1,54 @@
+var base_input = require('../lib/base_input'),
+ dgram = require('dgram'),
+ util = require('util'),
+ logger = require('log4node');
+
+function InputUdp() {
+ base_input.BaseInput.call(this);
+ this.config = {
+ name: 'Udp',
+ host_field: 'host',
+ port_field: 'port',
+ optional_params: ['type'],
+ }
+}
+
+util.inherits(InputUdp, base_input.BaseInput);
+
+InputUdp.prototype.afterLoadConfig = function(callback) {
+ logger.info('Start listening on udp', this.host + ':' + this.port);
+
+ this.server = dgram.createSocket('udp4');
+
+ this.server.on('message', function(data) {
+ try {
+ var parsed = JSON.parse(data);
+ this.emit('data', parsed);
+ }
+ catch(e) {
+ this.emit('data', {
+ '@message': data.toString().trim(),
+ '@source': 'udp_' + this.host + '_' + this.port,
+ '@type': this.type,
+ });
+ }
+ }.bind(this));
+
+ this.server.on('error', function(err) {
+ this.emit('init_error', err);
+ }.bind(this));
+
+ this.server.once('listening', callback);
+
+ this.server.bind(this.port, this.host);
+}
+
+InputUdp.prototype.close = function(callback) {
+ logger.info('Closing listening udp', this.host + ':' + this.port);
+ this.server.close();
+ callback();
+}
+
+exports.create = function() {
+ return new InputUdp();
+}
View
22 lib/lib/monitor_file.js
@@ -65,6 +65,7 @@ MonitoredFile.prototype.start = function(start_index) {
this.oldFdTailer = undefined;
}.bind(this), this.wait_delay_after_renaming);
this.fdTailer = undefined;
+ file_status[this.filename] = undefined;
this.restart(0);
}
else {
@@ -76,8 +77,22 @@ MonitoredFile.prototype.start = function(start_index) {
if (this.fdTailer) {
logger.debug('File', this.filename, 'changed');
this.fdTailer.read(function() {
- logger.info('File', this.filename, 'has changed, but no data read. Restart.');
- this.restart(0);
+ var last_data = this.fdTailer.last_data;
+ if (last_data) {
+ var buffer = new Buffer(last_data.length);
+ fs.read(this.fdTailer.fd, buffer, 0, last_data.length, this.fdTailer.current_index - last_data.length, function(err, bytesRead, buffer) {
+ if (err) {
+ return this.emit('error', err);
+ }
+ if (bytesRead == last_data.length && last_data == buffer.toString(this.options.buffer_encoding || 'ascii', 0, last_data.length)) {
+ logger.info('Event changed received, but no data change and last data match', this.filename, 'fd', this.fdTailer.fd);
+ }
+ else {
+ logger.info('Event changed received, but no data change and last data does not match.', 'Restarting reading', this.filename, 'at 0 fd', this.fdTailer.fd);
+ this.restart(0);
+ }
+ }.bind(this));
+ }
}.bind(this));
}
});
@@ -114,8 +129,10 @@ MonitoredFile.prototype.restart = function(start_index) {
if (err) {
return this.emit('error', err);
}
+ // Some data about file in db_file ?
if (file_status[this.filename] && file_status[this.filename].last_data && file_status[this.filename].index) {
var last_data = file_status[this.filename].last_data;
+ // Enough data to check last_data ?
if (file_status[this.filename].index >= last_data.length && stats.size > last_data.length) {
var buffer = new Buffer(last_data.length);
fs.read(fd, buffer, 0, last_data.length, file_status[this.filename].index - last_data.length, function(err, bytesRead, buffer) {
@@ -140,6 +157,7 @@ MonitoredFile.prototype.restart = function(start_index) {
this.fdTailer.read();
}
}
+ // No data about file, starting normally
else {
if (start_index === undefined) {
logger.info('Start reading', this.filename, 'at end', 'fd', fd);
View
49 lib/outputs/output_udp.js
@@ -0,0 +1,49 @@
+var base_output = require('../lib/base_output'),
+ util = require('util'),
+ dgram = require('dgram'),
+ logger = require('log4node'),
+ error_buffer = require('../lib/error_buffer');
+
+function OutputUdp() {
+ base_output.BaseOutput.call(this);
+ this.config = {
+ name: 'Udp',
+ host_field: 'host',
+ port_field: 'port',
+ optional_params: ['error_buffer_delay'],
+ default_values: {
+ 'error_buffer_delay': 2000,
+ }
+ }
+}
+
+util.inherits(OutputUdp, base_output.BaseOutput);
+
+OutputUdp.prototype.afterLoadConfig = function(callback) {
+ logger.info('Start output to udp', this.host + ':' + this.port);
+
+ this.socket = dgram.createSocket('udp4');
+
+ this.error_buffer = error_buffer.create('output udp to ' + this.host + ':' + this.port, this.error_buffer_delay, this);
+
+ callback();
+}
+
+OutputUdp.prototype.process = function(data) {
+ var message = new Buffer(JSON.stringify(data));
+ this.socket.send(message, 0, message.length, this.port, this.host, function(err, bytes) {
+ if (err || bytes != message.length) {
+ this.error_buffer.emit('error', new Error('Error while send data to ' + this.host + ':' + this.port + ':' + err));
+ }
+ }.bind(this));
+}
+
+OutputUdp.prototype.close = function(callback) {
+ logger.info('Closing output to udp', this.host + ':' + this.port);
+ this.socket.close();
+ callback();
+}
+
+exports.create = function() {
+ return new OutputUdp();
+}
View
5 package.json
@@ -17,13 +17,14 @@
}
],
"devDependencies": {
- "vows": "0.6.x"
+ "vows": "0.6.x",
+ "whereis": "0.0.2"
},
"scripts": {
"test": "./test-runner.sh"
},
"dependencies": {
- "log4node": "0.0.5",
+ "log4node": "0.1.0",
"zmq": "2.1.0",
"optimist": "0.3.4",
"moment": "1.7.0"
View
2  test-runner.sh
@@ -10,6 +10,6 @@ fi
for test in $TEST; do
echo "Launching test : $test"
- TZ=Etc/GMT NODE_PATH=../lib:../lib/lib vows $test --spec
+ PATH=/usr/sbin:$PATH TZ=Etc/GMT NODE_PATH=../lib:../lib/lib vows $test --spec
echo ""
done
View
7 test/50_real_life/logrotate.conf
@@ -0,0 +1,7 @@
+"output.txt" {
+ rotate 5
+ weekly
+ postrotate
+ kill -USR2 `cat process.pid`
+ endscript
+}
View
19 test/50_real_life/run.js
@@ -0,0 +1,19 @@
+var argv = require('optimist').argv;
+ log = require('log4node').reconfigure({file: argv.file, prefix: 'a '});
+
+console.log('Starting loop, count', argv.count, 'period', argv.period);
+
+var count = 0;
+
+function toto() {
+ if (count >= argv.count) {
+ console.log('Bye.');
+ clearInterval(toto);
+ process.exit(0);
+ return;
+ }
+ log.info('Line ' + count);
+ count ++;
+}
+
+setInterval(toto, parseInt(argv.period));
View
2  test/test_40_integration.js
@@ -380,4 +380,6 @@ vows.describe('Integration :').addBatch({
'zeromq transport': file2x2x2file(['output://zeromq://tcp://localhost:5567'], ['input://zeromq://tcp://*:5567']),
}).addBatch({
'unix socket transport': file2x2x2file(['output://unix:///tmp/test_socket'], ['input://unix:///tmp/test_socket']),
+}).addBatch({
+ 'udp transport': file2x2x2file(['output://udp://localhost:17880'], ['input://udp://127.0.0.1:17880']),
}).export(module);
View
136 test/test_50_real_life.js
@@ -0,0 +1,136 @@
+var vows = require('vows'),
+ assert = require('assert'),
+ fs = require('fs'),
+ agent = require('agent'),
+ spawn = require('child_process').spawn,
+ dgram = require('dgram'),
+ log = require('log4node'),
+ whereis = require('whereis');
+
+function createAgent(urls, callback, error_callback) {
+ var a = agent.create();
+ error_callback = error_callback || function(error) {
+ assert.ifError(error);
+ }
+ a.on('init_error', function(module_name, error) {
+ console.log("Init error agent detected, " + module_name + " : " + error);
+ error_callback(error);
+ });
+ a.on('error', function(module_name, error) {
+ console.log("Error agent detected, " + module_name + " : " + error);
+ error_callback(error);
+ });
+ a.loadUrls(['filter://add_source_host://', 'filter://add_timestamp://'].concat(urls), function(error) {
+ assert.ifError(error);
+ callback(a);
+ }, 200);
+}
+
+function run(command, args, pid_file, callback) {
+ log.info('Starting sub process');
+ var child = spawn(command, args);
+ if (pid_file) {
+ fs.writeFile(pid_file, child.pid, function(err) {
+ if (err) {
+ console.log(err);
+ }
+ });
+ }
+ child.stdout.on('data', function(data) {
+ process.stdout.write('STDOUT ' + data.toString());
+ });
+ child.stderr.on('data', function(data) {
+ process.stdout.write('STDERR ' + data.toString());
+ });
+ child.on('exit', function(exitCode) {
+ log.info('End of sub process', exitCode);
+ callback(exitCode);
+ });
+}
+
+vows.describe('Real life :').addBatch({
+ 'simple test': {
+ topic: function() {
+ var callback = this.callback;
+ var socket = dgram.createSocket('udp4');
+ socket.bind(17881);
+ var datas = [];
+ socket.on('message', function(data) {
+ datas.push(data);
+ });
+ createAgent([
+ 'input://file://output.txt',
+ 'output://udp://localhost:17881',
+ ], function(agent) {
+ run('node', ['50_real_life/run.js', '--file=output.txt', '--count=500', '--period=1'], undefined, function(exitCode) {
+ setTimeout(function() {
+ socket.close();
+ agent.close(function() {
+ callback(undefined, exitCode, datas);
+ });
+ }, 100);
+ });
+ });
+ },
+
+ check: function(err, exitCode, datas) {
+ assert.ifError(err);
+ fs.unlinkSync('output.txt');
+ assert.equal(0, exitCode);
+ assert.equal(500, datas.length);
+ }
+ },
+}).addBatch({
+ 'logrotate test': {
+ topic: function() {
+ var callback = this.callback;
+ var socket = dgram.createSocket('udp4');
+ socket.bind(17882);
+ var datas = [];
+ socket.on('message', function(data) {
+ datas.push(data.toString());
+ });
+ createAgent([
+ 'input://file://output.txt',
+ 'output://udp://localhost:17882',
+ ], function(agent) {
+ run('node', ['50_real_life/run.js', '--file=output.txt', '--count=500', '--period=1'], 'process.pid', function(exitCode) {
+ setTimeout(function() {
+ socket.close();
+ agent.close(function() {
+ callback(undefined, exitCode, datas);
+ });
+ }, 100);
+ });
+ whereis('logrotate', function(err, logrotate) {
+ if (err) {
+ return console.log(err);
+ }
+ setTimeout(function() {
+ console.log(logrotate);
+ run(logrotate, ['-f', '50_real_life/logrotate.conf', '-s', '/tmp/toto'], undefined, function(exitCode) {
+ console.log('Logrotate exit code', exitCode);
+ assert.equal(0, exitCode);
+ });
+ }, 500);
+ setTimeout(function() {
+ console.log(logrotate);
+ run(logrotate, ['-f', '50_real_life/logrotate.conf', '-s', '/tmp/toto'], undefined, function(exitCode) {
+ console.log('Logrotate exit code', exitCode);
+ assert.equal(0, exitCode);
+ });
+ }, 1000);
+ });
+ });
+ },
+
+ check: function(err, exitCode, datas) {
+ assert.ifError(err);
+ fs.unlinkSync('output.txt');
+ fs.unlinkSync('output.txt.1');
+ fs.unlinkSync('process.pid');
+ assert.equal(0, exitCode);
+ assert.equal(500, datas.length);
+ }
+ },
+}).export(module);
Please sign in to comment.
Something went wrong with that request. Please try again.