Navigation Menu

Skip to content

Commit

Permalink
Refresh new service's internal senders before finishing of graceful r…
Browse files Browse the repository at this point in the history
…estart.

While graceful restart, connection to "(myself):(10031 or something)/(tag)" are established by the old (going to be dying) service.
However, after the new service becomes ready, the graceful stop of the old service can be blocked by living connections from the new service itself.
So, before starting graceful stop, we must close all self-reference connections from the new.
  • Loading branch information
piroor committed Apr 22, 2015
1 parent 464f55f commit d71071e
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 5 deletions.
5 changes: 5 additions & 0 deletions lib/droonga/command/droonga_engine.rb
Expand Up @@ -528,6 +528,7 @@ def restart_graceful
@service_runner.on_ready = lambda do
logger.info("restart_graceful: new service runner is ready")
@service_runner.on_failure = nil
@service_runner.refresh_self_reference
old_service_runner.stop_gracefully
@restarting = false
logger.trace("restart_graceful: done")
Expand Down Expand Up @@ -720,6 +721,10 @@ def success?
@success
end

def refresh_self_reference
@supervisor.refresh_self_reference
end

private
def create_process_supervisor(input, output)
supervisor = ProcessSupervisor.new(@raw_loop, input, output)
Expand Down
12 changes: 8 additions & 4 deletions lib/droonga/command/droonga_engine_service.rb
Expand Up @@ -171,6 +171,9 @@ def run_worker_process_agent
@worker_process_agent.on_stop_immediately = lambda do
stop_immediately
end
@worker_process_agent.on_refresh_self_reference = lambda do
@engine.refresh_self_reference
end
@worker_process_agent.start
end

Expand Down Expand Up @@ -221,10 +224,11 @@ def stop_gracefully
@stopping = true
@receiver.stop_gracefully
#XXX To disconnect all clients to myself (old service),
# we must refresh all connections via EngineNode.
@engine.cluster.refresh_connection_for(@engine_name)
#XXX However, internal connections via Forwarder can be
# still there. Then we have to wait for their timeout.
# we must refresh the connection via EngineNode
# and Forwarder.
# However, connections from workers can be still
# there. Then we have to wait for their timeout.
@engine.refresh_self_reference
@receiver.ensure_no_client do
logger.trace("stop_gracefully: ready to stop service")
@engine.stop_gracefully do
Expand Down
5 changes: 5 additions & 0 deletions lib/droonga/engine.rb
Expand Up @@ -100,6 +100,11 @@ def stop_immediately
logger.trace("stop_immediately: done")
end

def refresh_self_reference
@cluster.refresh_connection_for(@name)
@state.forwarder.refresh_connection_for(@name)
end

def process(message)
@last_processed_message_timestamp = message["date"]
@dispatcher.process_message(message)
Expand Down
8 changes: 8 additions & 0 deletions lib/droonga/forwarder.rb
Expand Up @@ -66,6 +66,14 @@ def forward(message, destination)
logger.trace("forward: done")
end

def refresh_connection_for(name)
sender = @senders[name]
if sender
sender.shutdown
@senders.delete(name)
end
end

private
def output(receiver, message, command, arguments, options={})
logger.trace("output: start")
Expand Down
3 changes: 2 additions & 1 deletion lib/droonga/process_control_protocol.rb
@@ -1,4 +1,4 @@
# Copyright (C) 2014 Droonga Project
# Copyright (C) 2014-2015 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand All @@ -18,6 +18,7 @@ module ProcessControlProtocol
module Messages
STOP_GRACEFUL = "stop-graceful\n"
STOP_IMMEDIATELY = "stop-immediately\n"
REFRESH_SELF_REFERENCE = "refresh-self-reference\n"

READY = "ready\n"
FINISH = "finish\n"
Expand Down
6 changes: 6 additions & 0 deletions lib/droonga/process_supervisor.rb
Expand Up @@ -64,6 +64,12 @@ def stop_immediately
logger.trace("stop_immediately: done")
end

def refresh_self_reference
logger.trace("refresh_self_reference: start")
@output.write(Messages::REFRESH_SELF_REFERENCE)
logger.trace("refresh_self_reference: done")
end

private
def create_input(raw_input)
input = Coolio::IO.new(raw_input)
Expand Down
10 changes: 10 additions & 0 deletions lib/droonga/worker_process_agent.rb
Expand Up @@ -73,6 +73,10 @@ def on_stop_immediately=(callback)
@on_stop_immediately = callback
end

def on_refresh_self_reference=(callback)
@on_refresh_self_reference = callback
end

private
def create_input(raw_input)
@input = Coolio::IO.new(raw_input)
Expand All @@ -85,6 +89,8 @@ def create_input(raw_input)
on_stop_gracefully
when Messages::STOP_IMMEDIATELY
on_stop_immediately
when Messages::REFRESH_SELF_REFERENCE
on_refresh_self_reference
end
end
end
Expand Down Expand Up @@ -123,6 +129,10 @@ def on_stop_immediately
@on_stop_immediately.call if @on_stop_immediately
end

def on_refresh_self_reference
@on_refresh_self_reference.call if @on_refresh_self_reference
end

def log_tag
"worker_process_agent"
end
Expand Down

0 comments on commit d71071e

Please sign in to comment.