Skip to content

Commit

Permalink
Merge pull request #1064 from fluent/migrate-plugin-v14-copy-rr
Browse files Browse the repository at this point in the history
Migrate out_copy and out_roundrobin to v0.14 API
  • Loading branch information
tagomoris committed Jun 29, 2016
2 parents 3d285b8 + 346dc73 commit cdf3a23
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 416 deletions.
39 changes: 39 additions & 0 deletions example/copy_roundrobin.conf
@@ -0,0 +1,39 @@
<source>
@type dummy
@label @test
tag test.copy
auto_increment_key id
</source>

<source>
@type dummy
@label @test
tag test.rr
auto_increment_key id
</source>

<label @test>
<match test.copy>
@type copy
<store>
@type stdout
output_type json
</store>
<store>
@type stdout
output_type ltsv
</store>
</match>

<match test.rr>
@type roundrobin
<store>
@type stdout
output_type json
</store>
<store>
@type stdout
output_type ltsv
</store>
</match>
</label>
59 changes: 8 additions & 51 deletions lib/fluent/plugin/out_copy.rb
Expand Up @@ -14,72 +14,29 @@
# limitations under the License.
#

require 'fluent/output'
require 'fluent/plugin/multi_output'
require 'fluent/config/error'
require 'fluent/event'

module Fluent
module Fluent::Plugin
class CopyOutput < MultiOutput
Plugin.register_output('copy', self)
Fluent::Plugin.register_output('copy', self)

desc 'If true, pass different record to each `store` plugin.'
config_param :deep_copy, :bool, default: false

def initialize
super
@outputs = []
end

attr_reader :outputs

def configure(conf)
super
conf.elements.select {|e|
e.name == 'store'
}.each {|e|
type = e['@type']
unless type
raise ConfigError, "Missing 'type' parameter on <store> directive"
end
log.debug "adding store type=#{type.dump}"

output = Plugin.new_output(type)
output.router = router
output.configure(e)
@outputs << output
}
end

def start
super

@outputs.each do |o|
o.start unless o.started?
end
end

def shutdown
@outputs.each do |o|
o.shutdown unless o.shutdown?
end

super
end

def emit(tag, es, chain)
def process(tag, es)
unless es.repeatable?
m = MultiEventStream.new
m = Fluent::MultiEventStream.new
es.each {|time,record|
m.add(time, record)
}
es = m
end
if @deep_copy
chain = CopyOutputChain.new(@outputs, tag, es, chain)
else
chain = OutputChain.new(@outputs, tag, es, chain)

outputs.each do |output|
output.emit_events(tag, @deep_copy ? es.dup : es)
end
chain.next
end
end
end
53 changes: 13 additions & 40 deletions lib/fluent/plugin/out_roundrobin.rb
Expand Up @@ -14,68 +14,41 @@
# limitations under the License.
#

require 'fluent/output'
require 'fluent/plugin/multi_output'
require 'fluent/config/error'

module Fluent
module Fluent::Plugin
class RoundRobinOutput < MultiOutput
Plugin.register_output('roundrobin', self)
Fluent::Plugin.register_output('roundrobin', self)

config_section :store do
config_param :weight, :integer, default: 1
end

def initialize
super

@outputs = []
@weights = []
end

attr_reader :outputs, :weights
attr_accessor :rand_seed
attr_reader :weights

def configure(conf)
super

conf.elements.select {|e|
e.name == 'store'
}.each {|e|
type = e['@type']
unless type
raise ConfigError, "Missing 'type' parameter on <store> directive"
end

weight = e['weight']
weight = weight ? weight.to_i : 1
log.debug "adding store type=#{type.dump}, weight=#{weight}"

output = Plugin.new_output(type)
output.router = router
output.configure(e)
@outputs << output
@weights << weight
}
@stores.each do |store|
@weights << store.weight
end
@rr = -1 # starts from @output[0]
@rand_seed = Random.new.seed
end

def start
super

rebuild_weight_array

@outputs.each do |o|
o.start unless o.started?
end
end

def shutdown
@outputs.each do |o|
o.shutdown unless o.shutdown?
end

super
end

def emit(tag, es, chain)
next_output.emit(tag, es, chain)
def process(tag, es)
next_output.emit_events(tag, es)
end

private
Expand Down
52 changes: 52 additions & 0 deletions lib/fluent/test/driver/multi_output.rb
@@ -0,0 +1,52 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/test/driver/base'
require 'fluent/test/driver/event_feeder'

require 'fluent/plugin/multi_output'

module Fluent
module Test
module Driver
class MultiOutput < Base
include EventFeeder

def initialize(klass, opts: {}, &block)
super
raise ArgumentError, "plugin is not an instance of Fluent::Plugin::MultiOutput" unless @instance.is_a? Fluent::Plugin::MultiOutput
@flush_buffer_at_cleanup = nil
end

def run(flush: true, **kwargs, &block)
@flush_buffer_at_cleanup = flush
super(**kwargs, &block)
end

def run_actual(**kwargs, &block)
super(**kwargs, &block)
if @flush_buffer_at_cleanup
@instance.outputs.each{|o| o.force_flush }
end
end

def flush
@instance.outputs.each{|o| o.force_flush }
end
end
end
end
end

0 comments on commit cdf3a23

Please sign in to comment.