Permalink
Browse files

Improve Gelf output plugin and add tests

  • Loading branch information...
1 parent 85006ee commit de4cc1e1958938525832c3cfe7a3b5220667246a Bertrand Paquet committed Oct 15, 2012
Showing with 69 additions and 7 deletions.
  1. +5 −5 lib/outputs/output_gelf.js
  2. +2 −2 test/test_22_filter_regex.js
  3. +62 −0 test/test_40_integration.js
View
@@ -16,7 +16,7 @@ function OutputGelf() {
'error_buffer_delay': 2000,
'version': '1.0',
'message': '#{@message}',
- 'facility': 'Node-logstash GELF',
+ 'facility': '#{@type}',
'level': '6',
}
}
@@ -38,18 +38,18 @@ OutputGelf.prototype.process = function(data) {
var m = {
version: this.version,
short_message: this.replaceByFields(data, this.message),
- facility: this.replaceByFields(data, this.facility),
- host: data['@source_host'],
+ facility: this.replaceByFields(data, this.facility) || 'no_facility',
level: this.replaceByFields(data, this.level),
- timestamp: (new Date(data['@timestamp'])).getTime() / 1000 >> 0,
+ host: data['@source_host'],
+ timestamp: (new Date(data['@timestamp'])).getTime() / 1000,
};
if (!m.short_message) {
return;
}
logger.debug('Sending GELF', m);
zlib.deflate(new Buffer(JSON.stringify(m)), function(err, message) {
if (err) {
- return this.error_buffer.emit('error', new Error('Error while compressing data:' + err));
+ return this.emit('error', new Error('Error while compressing data:' + err));
}
this.socket.send(message, 0, message.length, this.port, this.host, function(err, bytes) {
if (err || bytes != message.length) {
@@ -64,11 +64,11 @@ vows.describe('Filter regex ').addBatch({
{'@message': 'abcd efgh ijk', '@fields': {fa: 'abcd'}},
]),
'date parsing': filter_helper.create('regex', '?regex=^(.*)$&fields=timestamp&date_format=DD/MMMM/YYYY:HH:mm:ss ZZ', [
- {'@message': '31/Jul/2012:18:02:28 +0200}'},
+ {'@message': '31/Jul/2012:18:02:28 +0200'},
{'@message': '31/Jul/2012'},
{'@message': 'toto'},
], [
- {'@message': '31/Jul/2012:18:02:28 +0200}', '@fields': {}, '@timestamp': '2012-07-31T16:02:28+00:00'},
+ {'@message': '31/Jul/2012:18:02:28 +0200', '@fields': {}, '@timestamp': '2012-07-31T16:02:28+00:00'},
{'@message': '31/Jul/2012', '@fields': {}, '@timestamp': '2012-07-31T00:00:00+00:00'},
{'@message': 'toto', '@fields': {}, '@timestamp': '0000-01-01T00:00:00+00:00'},
]),
@@ -6,6 +6,7 @@ var vows = require('vows'),
http = require('http'),
dgram = require('dgram'),
os = require('os'),
+ zlib = require('zlib'),
monitor_file = require('../lib/lib/monitor_file');
function checkResult(line, target) {
@@ -291,6 +292,7 @@ vows.describe('Integration :').addBatch({
fs.appendFileSync('input5.txt', 'line3\n');
setTimeout(function() {
agent.close(function() {
+ statsd.close();
callback(undefined, received);
});
}, 200);
@@ -344,6 +346,7 @@ vows.describe('Integration :').addBatch({
fs.appendFileSync('input1.txt', 'line2\n');
setTimeout(function() {
agent.close(function() {
+ statsd.close();
callback(errors, received);
});
}, 200);
@@ -360,6 +363,65 @@ vows.describe('Integration :').addBatch({
}
},
}).addBatch({
+ 'filegelf': {
+ topic: function() {
+ monitor_file.setFileStatus({});
+ var callback = this.callback;
+ var received = [];
+ var gelf = dgram.createSocket('udp4');
+ gelf.on('message', function(d) {
+ zlib.inflate(d, function(err, data) {
+ assert.ifError(err);
+ data = JSON.parse(data);
+ received.push(data);
+ });
+ });
+ gelf.bind(17879);
+ createAgent([
+ 'input://file://input1.txt?type=toto',
+ 'input://file://input2.txt',
+ 'filter://regex://?regex=^\\[(.*)\\]&fields=timestamp&date_format=DD/MMMM/YYYY:HH:mm:ss ZZ',
+ 'output://gelf://localhost:17879'
+ ], function(agent) {
+ setTimeout(function() {
+ fs.appendFileSync('input1.txt', '[31/Jul/2012:18:02:28 +0200] line1\n');
+ setTimeout(function() {
+ fs.appendFileSync('input2.txt', '[31/Jul/2012:20:02:28 +0200] line2\n');
+ setTimeout(function() {
+ agent.close(function() {
+ gelf.close();
+ callback(undefined, received);
+ });
+ }, 200);
+ }, 200);
+ }, 200);
+ });
+ },
+
+ check: function(err, data) {
+ fs.unlinkSync('input1.txt');
+ assert.ifError(err);
+ assert.deepEqual(data.sort(), [
+ {
+ version: '1.0',
+ short_message: '[31/Jul/2012:18:02:28 +0200] line1',
+ timestamp: (new Date('2012-07-31T16:02:28+00:00')).getTime() / 1000,
+ host: os.hostname(),
+ facility: 'toto',
+ level: '6'
+ },
+ {
+ version: '1.0',
+ short_message: '[31/Jul/2012:20:02:28 +0200] line2',
+ timestamp: (new Date('2012-07-31T18:02:28+00:00')).getTime() / 1000,
+ host: os.hostname(),
+ facility: 'no_facility',
+ level: '6'
+ }
+ ].sort());
+ }
+ },
+}).addBatch({
'non_existent_module': check_error_init([
'input://non_existent_module://'
], 'Cannot find module'),

0 comments on commit de4cc1e

Please sign in to comment.