diff --git a/README.md b/README.md index 64a787f..53cd049 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,24 @@ Then, output bocomes as belows (indented). You can see the `message` field is jo Remove tag suffix for output message +* remove_tag_slice *min..max* + + Remove tag parts by slice function. FYI: This option behaves like `tag.split('.').slice(min..max)`. + + For example, + + remove_tag_slice 0..-2 + + changes an input tag `foo.bar.host1` to `foo.bar`. + +* aggregate + + Aggregation unit. One of `all`, `in_tag`, `out_tag` can be specified. Default is `all`. + + * `all` counts summation for all input messages and emit one message in each interval. + * `in_tag` counts summation for each input tag seperately. + * `out_tag` counts summation for each tag *modified* by `add_tag_prefix`, `remove_tag_prefix`, or `remove_tag_slice`. + - delimiter Output matched messages after `join`ed with the specified delimiter. diff --git a/lib/fluent/plugin/out_grepcounter.rb b/lib/fluent/plugin/out_grepcounter.rb index 5c0d3e6..60cfd02 100644 --- a/lib/fluent/plugin/out_grepcounter.rb +++ b/lib/fluent/plugin/out_grepcounter.rb @@ -32,6 +32,7 @@ def initialize config_param :remove_tag_prefix, :string, :default => nil config_param :add_tag_suffix, :string, :default => nil config_param :remove_tag_suffix, :string, :default => nil + config_param :remove_tag_slice, :string, :default => nil config_param :output_with_joined_delimiter, :string, :default => nil # obsolete config_param :delimiter, :string, :default => nil config_param :aggregate, :string, :default => 'tag' @@ -94,7 +95,7 @@ def configure(conf) end end - if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil? + if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil? and @remove_tag_slice.nil? @add_tag_prefix = 'count' # not ConfigError to support lower version compatibility end @tag_proc = tag_proc @@ -102,11 +103,14 @@ def configure(conf) case @aggregate when 'all' raise Fluent::ConfigError, "grepcounter: `tag` must be specified with aggregate all" if @tag.nil? - when 'tag' - # raise Fluent::ConfigError, "grepcounter: add_tag_prefix must be specified with aggregate tag" if @add_tag_prefix.nil? + when 'tag' # obsolete + @aggregate = 'in_tag' + when 'in_tag' + when 'out_tag' else - raise Fluent::ConfigError, "grepcounter: aggregate allows tag/all" + raise Fluent::ConfigError, "grepcounter: aggregate allows all/in_tag/out_tag" end + @aggregate_proc = aggregate_proc(@tag_proc) if @store_file f = Pathname.new(@store_file) @@ -156,12 +160,14 @@ def emit(tag, es, chain) count += 1 end end + + aggregate_key = @aggregate_proc.call(tag) # thread safe merge - @counts[tag] ||= 0 - @matches[tag] ||= [] + @counts[aggregate_key] ||= 0 + @matches[aggregate_key] ||= [] @mutex.synchronize do - @counts[tag] += count - @matches[tag] += matches + @counts[aggregate_key] += count + @matches[aggregate_key] += matches end chain.next @@ -192,22 +198,29 @@ def flush_emit(step) time = Fluent::Engine.now flushed_counts, flushed_matches, @counts, @matches = @counts, @matches, {}, {} - if @aggregate == 'all' - count = 0; matches = [] - flushed_counts.keys.each do |tag| - count += flushed_counts[tag] - matches += flushed_matches[tag] - end + case @aggregate + when 'all' + count = flushed_counts[:all] + matches = flushed_matches[:all] output = generate_output(count, matches) Fluent::Engine.emit(@tag, time, output) if output - else + when 'out_tag' + flushed_counts.keys.each do |out_tag| + count = flushed_counts[out_tag] + matches = flushed_matches[out_tag] + output = generate_output(count, matches) + if output + Fluent::Engine.emit(out_tag, time, output) + end + end + else # in_tag flushed_counts.keys.each do |tag| count = flushed_counts[tag] matches = flushed_matches[tag] output = generate_output(count, matches, tag) if output - emit_tag = @tag_proc.call(tag) - Fluent::Engine.emit(emit_tag, time, output) + out_tag = @tag_proc.call(tag) + Fluent::Engine.emit(out_tag, time, output) end end end @@ -234,7 +247,30 @@ def generate_output(count, matches, tag = nil) output end + def aggregate_proc(tag_proc) + case @aggregate + when 'all' + Proc.new {|tag| :all } + when 'in_tag' + Proc.new {|tag| tag } + when 'out_tag' + Proc.new {|tag| tag_proc.call(tag) } + end + end + def tag_proc + tag_slice_proc = + if @remove_tag_slice + lindex, rindex = @remove_tag_slice.split('..', 2) + if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/ + raise Fluent::ConfigError, "out_grepcounter: remove_tag_slice must be formatted like [num]..[num]" + end + l, r = lindex.to_i, rindex.to_i + Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') } + else + Proc.new {|tag| tag } + end + rstrip = Proc.new {|str, substr| str.chomp(substr) } lstrip = Proc.new {|str, substr| str.start_with?(substr) ? str[substr.size..-1] : str } tag_prefix = "#{rstrip.call(@add_tag_prefix, '.')}." if @add_tag_prefix @@ -242,16 +278,16 @@ def tag_proc tag_prefix_match = "#{rstrip.call(@remove_tag_prefix, '.')}." if @remove_tag_prefix tag_suffix_match = ".#{lstrip.call(@remove_tag_suffix, '.')}" if @remove_tag_suffix tag_fixed = @tag if @tag - if tag_fixed - Proc.new {|tag| tag_fixed } - elsif tag_prefix_match and tag_suffix_match - Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag, tag_prefix_match), tag_suffix_match)}#{tag_suffix}" } + if tag_prefix_match and tag_suffix_match + Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag_slice_proc.call(tag), tag_prefix_match), tag_suffix_match)}#{tag_suffix}" } elsif tag_prefix_match - Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag, tag_prefix_match)}#{tag_suffix}" } + Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag_slice_proc.call(tag), tag_prefix_match)}#{tag_suffix}" } elsif tag_suffix_match - Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag, tag_suffix_match)}#{tag_suffix}" } + Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag_slice_proc.call(tag), tag_suffix_match)}#{tag_suffix}" } + elsif tag_prefix || @remove_tag_slice || tag_suffix + Proc.new {|tag| "#{tag_prefix}#{tag_slice_proc.call(tag)}#{tag_suffix}" } else - Proc.new {|tag| "#{tag_prefix}#{tag}#{tag_suffix}" } + Proc.new {|tag| tag_fixed } end end diff --git a/spec/out_grepcounter_spec.rb b/spec/out_grepcounter_spec.rb index 8626041..3095c98 100644 --- a/spec/out_grepcounter_spec.rb +++ b/spec/out_grepcounter_spec.rb @@ -340,6 +340,16 @@ def delete!(key) it { emit } end + context 'remove_tag_slice' do + let(:config) { CONFIG + %[remove_tag_slice 0..-2] } + let(:tag) { 'syslog.host1' } + before do + allow(Fluent::Engine).to receive(:now).and_return(time) + expect(Fluent::Engine).to receive(:emit).with("syslog", time, expected) + end + it { emit } + end + context 'all tag options' do let(:config) { CONFIG + %[ add_tag_prefix foo @@ -439,6 +449,45 @@ def delete!(key) it { emit } end + context 'aggregate in_tag' do + let(:messages) { ['foobar', 'foobar'] } + let(:emit) do + driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } } + driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } } + driver.instance.flush_emit(0) + end + + let(:config) { CONFIG + %[aggregate tag \n remove_tag_slice 0..-2] } + before do + allow(Fluent::Engine).to receive(:now).and_return(time) + expect(Fluent::Engine).to receive(:emit).with("foo", time, { + "count"=>2, "message"=>["foobar", "foobar"], "input_tag"=>"foo.bar", "input_tag_last"=>"bar" + }) + expect(Fluent::Engine).to receive(:emit).with("foo", time, { + "count"=>2, "message"=>["foobar", "foobar"], "input_tag"=>"foo.bar2", "input_tag_last"=>"bar2" + }) + end + it { emit } + end + + context 'aggregate out_tag' do + let(:messages) { ['foobar', 'foobar'] } + let(:emit) do + driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } } + driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } } + driver.instance.flush_emit(0) + end + + let(:config) { CONFIG + %[aggregate out_tag \n remove_tag_slice 0..-2] } + before do + allow(Fluent::Engine).to receive(:now).and_return(time) + expect(Fluent::Engine).to receive(:emit).with("foo", time, { + "count"=>4, "message"=>["foobar", "foobar", "foobar", "foobar"] + }) + end + it { emit } + end + context 'replace_invalid_sequence' do let(:config) { CONFIG + %[regexp WARN \n replace_invalid_sequence true] } let(:messages) { [ "\xff".force_encoding('UTF-8') ] }