Skip to content

Commit

Permalink
Refresh connections to other nodes from workers, when cluster state i…
Browse files Browse the repository at this point in the history
…s changed
  • Loading branch information
piroor committed Apr 22, 2015
1 parent e02ba1d commit 317cb6c
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 0 deletions.
7 changes: 7 additions & 0 deletions lib/droonga/command/droonga_engine_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ def stop_immediately
@loop.stop
end

def refresh_node_reference
@forwarder.refresh_all_connections
end

def start_forwarder
@forwarder = Forwarder.new(@loop,
:auto_close_timeout =>
Expand Down Expand Up @@ -223,6 +227,9 @@ def start_worker_process_agent
@worker_process_agent.on_stop_immediately = lambda do
stop_immediately
end
@worker_process_agent.on_refresh_node_reference = lambda do
refresh_node_reference
end
@worker_process_agent.start
@worker_process_agent.ready
end
Expand Down
4 changes: 4 additions & 0 deletions lib/droonga/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ def stop_immediately
logger.trace("stop_immediately: done")
end

def refresh_node_reference
@farm.refresh_node_reference
end

def process_message(message)
logger.trace("process_message: start", :message => message)
@message = message
Expand Down
3 changes: 3 additions & 0 deletions lib/droonga/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def initialize(loop, name, internal_name, options={})
options[:internal_connection_lifetime])

@dispatcher = create_dispatcher
@cluster.on_change = lambda do
@dispatcher.refresh_node_reference
end
end

def start
Expand Down
6 changes: 6 additions & 0 deletions lib/droonga/farm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ def stop_immediately
end
end

def refresh_node_reference
@slices.each_value do |slice|
slice.refresh_node_reference
end
end

def process(slice_name, message)
unless @slices.key?(slice_name)
raise NoSlice.new(slice_name, :message => message, :slices => @slices.keys)
Expand Down
12 changes: 12 additions & 0 deletions lib/droonga/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ def shutdown
@senders.each_value do |sender|
sender.shutdown
end
@senders = {}
@auto_close_timers.each_value do |timer|
timer.detach
end
@auto_close_timers = {}
logger.trace("shutdown: done")
end

Expand All @@ -72,6 +74,16 @@ def refresh_connection_for(name)
sender.shutdown
@senders.delete(name)
end
timer = @auto_close_timers[name]
if timer
timer.detach
@auto_close_timers.delete(name)
end
end

def refresh_all_connections
shutdown
start
end

private
Expand Down
1 change: 1 addition & 0 deletions lib/droonga/process_control_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Messages
STOP_GRACEFUL = "stop-graceful\n"
STOP_IMMEDIATELY = "stop-immediately\n"
REFRESH_SELF_REFERENCE = "refresh-self-reference\n"
REFRESH_NODE_REFERENCE = "refresh-node-reference\n"

READY = "ready\n"
FINISH = "finish\n"
Expand Down
6 changes: 6 additions & 0 deletions lib/droonga/process_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def refresh_self_reference
logger.trace("refresh_self_reference: done")
end

def refresh_node_reference
logger.trace("refresh_node_reference: start")
@output.write(Messages::REFRESH_NODE_REFERENCE)
logger.trace("refresh_node_reference: done")
end

private
def create_input(raw_input)
input = Coolio::IO.new(raw_input)
Expand Down
6 changes: 6 additions & 0 deletions lib/droonga/slice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ def stop_immediately
logger.trace("stop_immediately: done")
end

def refresh_node_reference
logger.trace("refresh_node_reference: start")
@supervisor.refresh_node_reference if @supervisor
logger.trace("refresh_node_reference: done")
end

def process(message)
logger.trace("process: start")
@processor.process(message)
Expand Down
10 changes: 10 additions & 0 deletions lib/droonga/worker_process_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ def on_refresh_self_reference=(callback)
@on_refresh_self_reference = callback
end

def on_refresh_node_reference=(callback)
@on_refresh_node_reference = callback
end

private
def create_input(raw_input)
@input = Coolio::IO.new(raw_input)
Expand All @@ -91,6 +95,8 @@ def create_input(raw_input)
on_stop_immediately
when Messages::REFRESH_SELF_REFERENCE
on_refresh_self_reference
when Messages::REFRESH_NODE_REFERENCE
on_refresh_node_reference
end
end
end
Expand Down Expand Up @@ -133,6 +139,10 @@ def on_refresh_self_reference
@on_refresh_self_reference.call if @on_refresh_self_reference
end

def on_refresh_node_reference
@on_refresh_node_reference.call if @on_refresh_node_reference
end

def log_tag
"worker_process_agent"
end
Expand Down

0 comments on commit 317cb6c

Please sign in to comment.