Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 4 commits
  • 14 files changed
  • 0 comments
  • 1 contributor
47 Readme.markdown
Source Rendered
@@ -36,7 +36,7 @@ The architecture is identical to logstash architecture. You have to instanciates
36 36
37 37 * [inputs plugin](https://github.com/bpaquet/node-logstash/tree/master/lib/inputs): where datas come into node-logstash. Examples: file, zeromq transport layer
38 38 * [filter plugin](https://github.com/bpaquet/node-logstash/tree/master/lib/filters): extract fields from logs, like timestamps. Example: regex plugin
39   -* [outputs plugins](https://github.com/bpaquet/node-logstash/tree/master/lib/outputs): where datas leave from node-logstash: Example: elastic search , zeromq transport layer.
  39 +* [outputs plugins](https://github.com/bpaquet/node-logstash/tree/master/lib/outputs): where datas leave from node-logstash: Examples: elastic search , zeromq transport layer.
40 40
41 41
42 42 A typical node-logstash deployement contains agents to crawl logs and a log server.
@@ -118,6 +118,8 @@ Outputs and filter, commons parameters
118 118 ===
119 119
120 120 * ``only_type``: execute the filter / output plugin only on lines with specified type. Example: ``only_type=nginx``
  121 +* ``only_field_exist_toto``: execute the filter / output plugin only on lines with a field ``toto``. You can specify it multiple times, all fields have to exist.
  122 +* ``only_field_equal_toto=aaa``: execute the filter / output plugin only on lines with a field ``toto``, with value ``aaa``. You can specify it multiple times, all fields have to exist and have the specified value.
121 123
122 124 Ouputs plugins
123 125 ===
@@ -132,10 +134,19 @@ Example: ``output://zeromq://tcp://192.168.1.1:5555``, to send logs to 192.168.1
132 134 Elastic search
133 135 ---
134 136
135   -This plugin is used on log server to send logs to elastic search.
  137 +This plugin is used on log server to send logs to elastic search, using HTTP REST interface.
136 138
137 139 Example: ``output://elasticsearch://localhost:9001`` to send to the HTTP interface of an elastic search server listening on port 9001.
138 140
  141 +Elastic search ZeroMQ
  142 +---
  143 +
  144 +This plugin is used on log server to send logs to elastic search, using ZeroMQ transport.
  145 +You can find the ZeroMQ transport here: https://github.com/bpaquet/transport-zeromq.
  146 +
  147 +Example: ``output://elasticsearch_zeromq://tcp://localhost:9700`` to send to the zeromq transport of an elastic search server listening on port 9700.
  148 +
  149 +
139 150 Statsd
140 151 ---
141 152
@@ -165,7 +176,8 @@ Regex
165 176
166 177 The regex filter is used to extract data from lines of logs. The lines of logs are not modified by this filter.
167 178
168   -Example: ``filter://regex://?regex=^(\S)+ &fields=toto``, to extract the first word of a line of logs, and place it into the ``toto`` field.
  179 +Example 1: ``filter://regex://?regex=^(\S)+ &fields=toto``, to extract the first word of a line of logs, and place it into the ``toto`` field.
  180 +
169 181 Example 2: ``filter://regex://http_combined?only_type=nginx``, to extract fields following configuration into the http_combined pattern. node-logstash is bundled with [some configurations](https://github.com/bpaquet/node-logstash/tree/master/lib/patterns). You can add your custom patterns directories, see options ``--patterns_directories``.
170 182
171 183 Params:
@@ -187,6 +199,35 @@ Params:
187 199 * from: the regex to find pattern which will be replaced. You have to escape special characters.
188 200 * to: the replacement string
189 201
  202 +Grep
  203 +---
  204 +
  205 +The grep filter can remove lines which match or do not match a given regex.
  206 +
  207 +Example 1: ``filter://grep://?regex=abc`` remove all lines which do not contain ``abc``. Equivalent to ``grep`
  208 +
  209 +Example 2: ``filter://grep://?regex=abc&invert=true`` remove all lines which contain ``abc``. Equivalent to ``grep -v``
  210 +
  211 +Example 3: ``filter://grep://?type=nginx&regex=abc`` remove all lines with type ``nginx`` which do not contain ``abc`` and
  212 +
  213 +Params:
  214 +
  215 +* regex: the regex to be matched
  216 +* invert: if ``true``, remove lines which match. Default value: false
  217 +
  218 +Compute field
  219 +---
  220 +
  221 +The compute field filter is used to add a new field to a line, with a fixed value, or with a value computed from other fields.
  222 +
  223 +Example 1: ``filter://compute_field?toto&value=abc`` add a field named ``toto`` with value ``abc``
  224 +
  225 +Example 2: ``filter://compute_field?toto&value=abc#{titi}`` add a field named ``toto`` with value ``abcef``, if line contain a field ``titi`` with value ``ef``
  226 +
  227 +Params:
  228 +
  229 +* value: the value to place in the given field
  230 +
190 231 License
191 232 ===
192 233
1  lib/filters/filter_add_source_host.js
@@ -21,6 +21,7 @@ FilterAddSourceHost.prototype.process = function(data) {
21 21 if (!data['@source_host']) {
22 22 data['@source_host'] = this.os;
23 23 }
  24 + return data;
24 25 }
25 26
26 27 exports.create = function() {
1  lib/filters/filter_add_timestamp.js
@@ -19,6 +19,7 @@ FilterAddTimestamp.prototype.process = function(data) {
19 19 if (!data['@timestamp']) {
20 20 data['@timestamp'] = (new Date()).toISOString();
21 21 }
  22 + return data;
22 23 }
23 24
24 25 exports.create = function() {
34 lib/filters/filter_compute_field.js
... ... @@ -0,0 +1,34 @@
  1 +var base_filter = require('../lib/base_filter'),
  2 + util = require('util'),
  3 + logger = require('log4node');
  4 +
  5 +function FilterComputeField() {
  6 + base_filter.BaseFilter.call(this);
  7 + this.config = {
  8 + name: 'ComputeField',
  9 + required_params: ['value'],
  10 + host_field: 'field_name',
  11 + }
  12 +}
  13 +
  14 +util.inherits(FilterComputeField, base_filter.BaseFilter);
  15 +
  16 +FilterComputeField.prototype.afterLoadConfig = function(callback) {
  17 + logger.info('Initialized compute field filter on field: ' + this.field_name + ', value: ' + this.value);
  18 + callback();
  19 +}
  20 +
  21 +FilterComputeField.prototype.process = function(data) {
  22 + var value = this.replaceByFields(data, this.value);
  23 + if (value) {
  24 + if (!data['@fields']) {
  25 + data['@fields'] = {};
  26 + }
  27 + data['@fields'][this.field_name] = value;
  28 + }
  29 + return data;
  30 +}
  31 +
  32 +exports.create = function() {
  33 + return new FilterComputeField();
  34 +}
37 lib/filters/filter_grep.js
... ... @@ -0,0 +1,37 @@
  1 +var base_filter = require('../lib/base_filter'),
  2 + util = require('util'),
  3 + logger = require('log4node');
  4 +
  5 +function FilterGrep() {
  6 + base_filter.BaseFilter.call(this);
  7 + this.config = {
  8 + name: 'Grep',
  9 + required_params: ['regex'],
  10 + optional_params: ['invert'],
  11 + default_values: {
  12 + 'invert': 'false',
  13 + }
  14 + }
  15 +}
  16 +
  17 +util.inherits(FilterGrep, base_filter.BaseFilter);
  18 +
  19 +FilterGrep.prototype.afterLoadConfig = function(callback) {
  20 + this.regex = new RegExp(this.regex);
  21 + this.invert = this.invert == 'true';
  22 + logger.info('Initialized grep filter on regex: ' + this.regex + ', invert: ' + this.invert);
  23 + callback();
  24 +}
  25 +
  26 +FilterGrep.prototype.process = function(data) {
  27 + var match = data['@message'].match(this.regex);
  28 + console.log(data['@message'], this.regex, match, this.invert);
  29 + if (this.invert) {
  30 + match = ! match;
  31 + }
  32 + return match ? data : undefined;
  33 +}
  34 +
  35 +exports.create = function() {
  36 + return new FilterGrep();
  37 +}
1  lib/filters/filter_mutate_replace.js
@@ -25,6 +25,7 @@ FilterMutateReplace.prototype.process = function(data) {
25 25 data['@fields'][this.field_name] = data['@fields'][this.field_name].replace(this.regex, this.to);
26 26 logger.debug('New value', data['@fields'][this.field_name]);
27 27 }
  28 + return data;
28 29 }
29 30
30 31 exports.create = function() {
1  lib/filters/filter_regex.js
@@ -72,6 +72,7 @@ FilterRegex.prototype.process = function(data) {
72 72 }
73 73 }
74 74 }
  75 + return data;
75 76 }
76 77
77 78 exports.create = function() {
48 lib/lib/base_component.js
@@ -14,6 +14,8 @@ BaseComponent.prototype.extendedLoadConfig = function(callback) {
14 14 }
15 15
16 16 BaseComponent.prototype.loadConfig = function(url, callback) {
  17 + this.message_filtering = {};
  18 +
17 19 if (this.config.host_field || this.config.port_field || this.config.required_params || this.config.optional_params) {
18 20 if (url.length == 0) {
19 21 return this.emit('init_error', 'No empty url for ' + this.config.name);
@@ -69,6 +71,27 @@ BaseComponent.prototype.loadConfig = function(url, callback) {
69 71 }
70 72 }
71 73
  74 + if (this.parsed_url.params.only_type) {
  75 + this.message_filtering.only_type = this.parsed_url.params.only_type;
  76 + }
  77 +
  78 + for(var i in this.parsed_url.params) {
  79 + var res = i.match(/^only_field_exist_(.+)$/);
  80 + if (res) {
  81 + if (!this.message_filtering.only_field_exist) {
  82 + this.message_filtering.only_field_exist = [];
  83 + }
  84 + this.message_filtering.only_field_exist.push(res[1]);
  85 + }
  86 + var res = i.match(/^only_field_equal_(.+)$/);
  87 + if (res) {
  88 + if (!this.message_filtering.only_field_equal) {
  89 + this.message_filtering.only_field_equal = {};
  90 + }
  91 + this.message_filtering.only_field_equal[res[1]] = this.parsed_url.params[i];
  92 + }
  93 + }
  94 +
72 95 this.extendedLoadConfig(function(err) {
73 96 if (err) {
74 97 return callback(err);
@@ -92,9 +115,32 @@ BaseComponent.prototype.loadConfig = function(url, callback) {
92 115 }
93 116
94 117 BaseComponent.prototype.processMessage = function(data) {
95   - if (this.parsed_url.params.only_type && this.parsed_url.params.only_type != data['@type']) {
  118 + if (this.message_filtering.only_type && this.message_filtering.only_type != data['@type']) {
96 119 return false;
97 120 }
  121 +
  122 + if (this.message_filtering.only_field_exist) {
  123 + if (! data['@fields']) {
  124 + return false;
  125 + }
  126 + for(var i = 0; i < this.message_filtering.only_field_exist.length; i ++) {
  127 + if (! data['@fields'][this.message_filtering.only_field_exist[i]]) {
  128 + return false;
  129 + }
  130 + }
  131 + }
  132 +
  133 + if (this.message_filtering.only_field_equal) {
  134 + if (! data['@fields']) {
  135 + return false;
  136 + }
  137 + for(var i in this.message_filtering.only_field_equal) {
  138 + if (! data['@fields'][i] || data['@fields'][i] != this.message_filtering.only_field_equal[i]) {
  139 + return false;
  140 + }
  141 + }
  142 + }
  143 +
98 144 return true;
99 145 }
100 146
14 lib/lib/base_filter.js
@@ -17,9 +17,19 @@ BaseFilter.prototype.init = function(url) {
17 17 }
18 18 this.on('input', function(data) {
19 19 if (this.processMessage(data)) {
20   - this.process(data);
  20 + var res = this.process(data);
  21 + if (res) {
  22 + if (res.length === undefined) {
  23 + res = [res];
  24 + }
  25 + for(var i = 0; i < res.length; i ++) {
  26 + this.emit('output', res[i]);
  27 + }
  28 + }
  29 + }
  30 + else {
  31 + this.emit('output', data);
21 32 }
22   - this.emit('output', data);
23 33 }.bind(this));
24 34
25 35 this.emit('init_ok');
0  test/test_23_error_buffer.js → test/test_12_error_buffer.js
File renamed without changes
17 test/test_22_filter_regex.js
@@ -137,7 +137,8 @@ vows.describe('Filter regex ').addBatch({
137 137 }
138 138 ]),
139 139 'http vhost combined with predefined type': filter_helper.create('regex', 'http_vhost_combined', [
140   - {'@message': 'ip-10-62-95-254.eu-west-1.compute.internal:80 88.178.233.127 - cdv [12/Oct/2012:14:23:28 +0000] "GET /public/utils/ejam.jar HTTP/1.1" 304 172 "-" "Mozilla/4.0 (Windows 7 6.1) Java/1.7.0_07"'}
  140 + {'@message': 'ip-10-62-95-254.eu-west-1.compute.internal:80 88.178.233.127 - cdv [12/Oct/2012:14:23:28 +0000] "GET /public/utils/ejam.jar HTTP/1.1" 304 172 "-" "Mozilla/4.0 (Windows 7 6.1) Java/1.7.0_07"'},
  141 + {'@message': 'www.skillstar.com:80 86.221.21.138 - - [13/Oct/2012:09:04:42 +0200] "GET /favicon.ico HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:15.0) Gecko/20100101 Firefox/15.0.1"'},
141 142 ],[
142 143 {
143 144 '@message': 'ip-10-62-95-254.eu-west-1.compute.internal:80 88.178.233.127 - cdv [12/Oct/2012:14:23:28 +0000] "GET /public/utils/ejam.jar HTTP/1.1" 304 172 "-" "Mozilla/4.0 (Windows 7 6.1) Java/1.7.0_07"',
@@ -152,6 +153,20 @@ vows.describe('Filter regex ').addBatch({
152 153 vhost: 'ip-10-62-95-254.eu-west-1.compute.internal:80',
153 154 },
154 155 '@timestamp': '2012-10-12T14:23:28+00:00'
  156 + },
  157 + {
  158 + '@message': 'www.skillstar.com:80 86.221.21.138 - - [13/Oct/2012:09:04:42 +0200] "GET /favicon.ico HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:15.0) Gecko/20100101 Firefox/15.0.1"',
  159 + '@fields': {
  160 + user: '-',
  161 + bytes_sent: 0,
  162 + ip: '86.221.21.138',
  163 + status: 304,
  164 + referer: '-',
  165 + vhost: 'www.skillstar.com:80',
  166 + user_agent: 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:15.0) Gecko/20100101 Firefox/15.0.1',
  167 + request: 'GET /favicon.ico HTTP/1.1'
  168 + },
  169 + '@timestamp': '2012-10-13T07:04:42+00:00'
155 170 }
156 171 ]),
157 172 }).export(module);
19 test/test_23_filter_grep.js
... ... @@ -0,0 +1,19 @@
  1 +var vows = require('vows'),
  2 + assert = require('assert'),
  3 + os = require('os'),
  4 + filter_helper = require('./filter_helper');
  5 +
  6 +vows.describe('Filter grep ').addBatch({
  7 + 'normal': filter_helper.create('grep', '?regex=abc', [
  8 + {'@message': 'abcd'},
  9 + {'@message': 'abd'},
  10 + ], [
  11 + {'@message': 'abcd'},
  12 + ]),
  13 + 'regex': filter_helper.create('grep', '?regex=\\d', [
  14 + {'@message': 'abcd'},
  15 + {'@message': 'abd5'},
  16 + ], [
  17 + {'@message': 'abd5'},
  18 + ]),
  19 +}).export(module);
25 test/test_25_filter_compute_field.js
... ... @@ -0,0 +1,25 @@
  1 +var vows = require('vows'),
  2 + assert = require('assert'),
  3 + os = require('os'),
  4 + filter_helper = require('./filter_helper');
  5 +
  6 +vows.describe('Filter compute field ').addBatch({
  7 + 'normal': filter_helper.create('compute_field', 'titi?value=ab', [
  8 + {'@message': 'toto'},
  9 + {'@message': 'toto', '@fields': {'bouh': 'tata'}},
  10 + ], [
  11 + {'@message': 'toto', '@fields': {'titi': 'ab'}},
  12 + {'@message': 'toto', '@fields': {'bouh': 'tata', 'titi': 'ab'}},
  13 + ]),
  14 + 'with value': filter_helper.create('compute_field', 'titi?value=ab#{bouh}', [
  15 + {'@message': 'toto'},
  16 + {'@message': 'toto', '@fields': {'bouh': 'tata'}},
  17 + {'@message': 'toto', '@fields': {'bouh': 42}},
  18 + {'@message': 'toto', '@fields': {'bouh': 42, 'titi': 'abcdef'}},
  19 + ], [
  20 + {'@message': 'toto'},
  21 + {'@message': 'toto', '@fields': {'bouh': 'tata', 'titi': 'abtata'}},
  22 + {'@message': 'toto', '@fields': {'bouh': 42, 'titi': 'ab42'}},
  23 + {'@message': 'toto', '@fields': {'bouh': 42, 'titi': 'ab42'}},
  24 + ]),
  25 +}).export(module);
61 test/test_26_message_filtering.js
... ... @@ -0,0 +1,61 @@
  1 +var vows = require('vows'),
  2 + assert = require('assert'),
  3 + os = require('os'),
  4 + filter_helper = require('./filter_helper');
  5 +
  6 +vows.describe('Message filtering ').addBatch({
  7 + 'nothing': filter_helper.create('compute_field', 'titi?value=a', [
  8 + {'@message': 'toto'},
  9 + ], [
  10 + {'@message': 'toto', '@fields': {'titi': 'a'}},
  11 + ]),
  12 + 'only type': filter_helper.create('compute_field', 'titi?value=a&only_type=z', [
  13 + {'@message': 'toto'},
  14 + {'@message': 'toto', '@type': 'tata'},
  15 + {'@message': 'toto', '@type': 'z'},
  16 + ], [
  17 + {'@message': 'toto'},
  18 + {'@message': 'toto', '@type': 'tata'},
  19 + {'@message': 'toto', '@type': 'z', '@fields': {'titi': 'a'}},
  20 + ]),
  21 + 'only field exist': filter_helper.create('compute_field', 'titi?value=a&only_field_exist_titi', [
  22 + {'@message': 'toto'},
  23 + {'@message': 'toto', '@fields': {'toto': 'b'}},
  24 + {'@message': 'toto', '@fields': {'titi': 'b'}},
  25 + ], [
  26 + {'@message': 'toto'},
  27 + {'@message': 'toto', '@fields': {'toto': 'b'}},
  28 + {'@message': 'toto', '@fields': {'titi': 'a'}},
  29 + ]),
  30 + 'multiple only field exist': filter_helper.create('compute_field', 'titi?value=a&only_field_exist_titi&only_field_exist_toto', [
  31 + {'@message': 'toto'},
  32 + {'@message': 'toto', '@fields': {'toto': 'b'}},
  33 + {'@message': 'toto', '@fields': {'titi': 'b'}},
  34 + {'@message': 'toto', '@fields': {'titi': 'b', 'toto': 'b'}},
  35 + ], [
  36 + {'@message': 'toto'},
  37 + {'@message': 'toto', '@fields': {'toto': 'b'}},
  38 + {'@message': 'toto', '@fields': {'titi': 'b'}},
  39 + {'@message': 'toto', '@fields': {'titi': 'a', 'toto': 'b'}},
  40 + ]),
  41 + 'only field equal': filter_helper.create('compute_field', 'titi?value=a&only_field_equal_titi=z', [
  42 + {'@message': 'toto'},
  43 + {'@message': 'toto', '@fields': {'titi': 'b'}},
  44 + {'@message': 'toto', '@fields': {'titi': 'z'}},
  45 + ], [
  46 + {'@message': 'toto'},
  47 + {'@message': 'toto', '@fields': {'titi': 'b'}},
  48 + {'@message': 'toto', '@fields': {'titi': 'a'}},
  49 + ]),
  50 + 'multiple only field equal': filter_helper.create('compute_field', 'titi?value=aa&only_field_equal_titi=a&only_field_equal_toto=b', [
  51 + {'@message': 'toto'},
  52 + {'@message': 'toto', '@fields': {'toto': 'b'}},
  53 + {'@message': 'toto', '@fields': {'titi': 'a'}},
  54 + {'@message': 'toto', '@fields': {'titi': 'a', 'toto': 'b'}},
  55 + ], [
  56 + {'@message': 'toto'},
  57 + {'@message': 'toto', '@fields': {'toto': 'b'}},
  58 + {'@message': 'toto', '@fields': {'titi': 'a'}},
  59 + {'@message': 'toto', '@fields': {'titi': 'aa', 'toto': 'b'}},
  60 + ]),
  61 +}).export(module);

No commit comments for this range

Something went wrong with that request. Please try again.