Skip to content

Commit

Permalink
added experimental Status class and in_status
Browse files Browse the repository at this point in the history
  • Loading branch information
frsyuki committed Mar 3, 2012
1 parent dcb9cc5 commit bb205dc
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/fluent/load.rb
Expand Up @@ -12,6 +12,7 @@
require 'fluent/env'
require 'fluent/version'
require 'fluent/log'
require 'fluent/status'
require 'fluent/config'
require 'fluent/engine'
require 'fluent/mixin'
Expand Down
5 changes: 5 additions & 0 deletions lib/fluent/output.rb
Expand Up @@ -168,6 +168,7 @@ def initialize
@error_history = []
@error_history.extend(MonitorMixin)
@secondary_limit = 8
@emit_count = 0
end

config_param :buffer_type, :string, :default => 'memory'
Expand Down Expand Up @@ -210,6 +211,9 @@ def configure(conf)

@secondary.secondary_init(self)
end

Status.register(self, "queue_size") { @buffer.queue_size }
Status.register(self, "emit_count") { @emit_count }
end

def start
Expand All @@ -226,6 +230,7 @@ def shutdown
end

def emit(tag, es, chain, key="")
@emit_count += 1
data = format_stream(tag, es)
if @buffer.emit(key, data, chain)
submit_flush
Expand Down
81 changes: 81 additions & 0 deletions lib/fluent/plugin/in_status.rb
@@ -0,0 +1,81 @@

#
# Fluent
#
# Copyright (C) 2011 FURUHASHI Sadayuki
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Fluent


class StatusInput < Input
Plugin.register_input('status', self)

def initialize
super
end

config_param :emit_interval, :time, :default => 60
config_param :tag, :string

class TimerWatcher < Coolio::TimerWatcher
def initialize(interval, repeat, &callback)
@callback = callback
super(interval, repeat)
end

def on_timer
@callback.call
rescue
# TODO log?
$log.error $!.to_s
$log.error_backtrace
end
end

def configure(conf)
super
end

def start
@loop = Coolio::Loop.new
@timer = TimerWatcher.new(@emit_interval, true, &method(:on_timer))
@loop.attach(@timer)
@thread = Thread.new(&method(:run))
end

def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@thread.join
end

def run
@loop.run
rescue
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end

def on_timer
now = Engine.now
Status.each {|record|
Engine.emit(@tag, now, record)
}
end
end


end

49 changes: 49 additions & 0 deletions lib/fluent/status.rb
@@ -0,0 +1,49 @@
#
# Fluent
#
# Copyright (C) 2011 FURUHASHI Sadayuki
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Fluent



class StatusClass
def initialize
@entries = {}
end

def register(instance, name, &block)
(@entries[instance.object_id] ||= {})[name] = block
nil
end

def each(&block)
@entries.each {|obj_id,hash|
record = {}
hash.each_pair {|name,block|
record[name] = block.call
}
block.call(record)
}
end
end

# Don't use this class from plugins.
# The interface may be changed
Status = StatusClass.new


end

0 comments on commit bb205dc

Please sign in to comment.