Navigation Menu

Skip to content

Commit

Permalink
Apply timeout for internal connections via EngineNode
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 22, 2015
1 parent f11725c commit bfb7578
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 15 deletions.
7 changes: 5 additions & 2 deletions lib/droonga/cluster.rb
Expand Up @@ -59,6 +59,7 @@ def default_state
def initialize(loop, params)
@loop = loop

@params = params
@catalog = params[:catalog]
@state = nil

Expand Down Expand Up @@ -185,9 +186,11 @@ def all_node_names
def create_engine_nodes
all_node_names.collect do |name|
node_state = @state[name] || {}
EngineNode.new(name,
EngineNode.new(@loop,
name,
node_state,
@loop)
:auto_close_timeout =>
@params[:internal_connection_lifetime])
end
end

Expand Down
4 changes: 3 additions & 1 deletion lib/droonga/engine.rb
Expand Up @@ -45,7 +45,9 @@ def initialize(loop, name, internal_name, options={})
:internal_connection_lifetime =>
options[:internal_connection_lifetime])
@cluster = Cluster.new(loop,
:catalog => @catalog)
:catalog => @catalog,
:internal_connection_lifetime =>
options[:internal_connection_lifetime])

@dispatcher = create_dispatcher
end
Expand Down
65 changes: 53 additions & 12 deletions lib/droonga/engine_node.rb
Expand Up @@ -14,23 +14,27 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "time"
require "coolio"

require "droonga/loggable"
require "droonga/forward_buffer"
require "droonga/fluent_message_sender"
require "droonga/node_name"
require "droonga/node_role"

module Droonga
class EngineNode
include Loggable

DEFAULT_AUTO_CLOSE_TIMEOUT_SECONDS = 60

attr_reader :name

def initialize(name, state, loop)
def initialize(loop, name, state, options={})
@loop = loop
@name = name
logger.trace("initialize: start")

@state = state
logger.trace("initialize: start")

@buffer = ForwardBuffer.new(name)
boundary_timestamp = accept_messages_newer_than_timestamp
Expand All @@ -39,12 +43,13 @@ def initialize(name, state, loop)
output(message, destination)
end

parsed_name = parse_node_name(@name)
@sender = FluentMessageSender.new(loop,
parsed_name[:host],
parsed_name[:port],
:buffering => true)
@sender.start
@node_name = NodeName.parse(@name)

@sender = nil
@auto_close_timer = nil
@auto_close_timeout = options[:auto_close_timeout] ||
DEFAULT_AUTO_CLOSE_TIMEOUT_SECONDS

logger.trace("initialize: done")
end

Expand All @@ -56,7 +61,7 @@ def start

def shutdown
logger.trace("shutdown: start")
@sender.shutdown
@sender.shutdown if @sender
logger.trace("shutdown: done")
end

Expand Down Expand Up @@ -129,7 +134,7 @@ def to_json

def resume
logger.trace("resume: start")
@sender.resume
sender.resume
unless @buffer.empty?
if really_writable?
logger.info("Target becomes writable. Start to forwarding.")
Expand Down Expand Up @@ -232,10 +237,46 @@ def output(message, destination)
output_tag = "#{parsed_receiver[:tag]}.message"
log_info = "<#{receiver}>:<#{output_tag}>"
logger.trace("forward: start: #{log_info}")
@sender.send(output_tag, message)
sender.send(output_tag, message)
set_auto_close_timer
logger.trace("forward: end")
end

def sender
@sender ||= create_sender
end

def create_sender
sender = FluentMessageSender.new(@loop,
@node_name.host,
@node_name.port,
:buffering => true)
sender.start
sender
end

def set_auto_close_timer
previous_timer = @auto_close_timer
previous_timer.detach if previous_timer

timer = Coolio::TimerWatcher.new(@auto_close_timeout)
on_timeout = lambda do
timer.detach
@auto_close_timer = nil
sender = @sender
if sender
logger.info("sender for #{name} is automatically closed by timeout.")
sender.shutdown
@sender = nil
end
end
timer.on_timer do
on_timeout.call
end
@loop.attach(timer)
@auto_close_timer = timer
end

def log_tag
"[#{Process.ppid}] engine-node: #{@name}"
end
Expand Down

0 comments on commit bfb7578

Please sign in to comment.