Navigation Menu

Skip to content

Commit

Permalink
droonga-engine: support restarting without downtime
Browse files Browse the repository at this point in the history
Note that droonga-engine itself can restart without downtime (it means
that socket is alive while restarting) but Serf has downtime. Serf
cluster will miss the droonga-engine node while restarting.

TODO:

  * Re-consider how to operate restarting droonga-engine. The current
    implementation is `touch $BASE_DIR/restart.txt`. Is it reasonable
    way?
  • Loading branch information
kou committed Jan 5, 2015
1 parent a7a96c4 commit e2bc408
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 11 deletions.
132 changes: 121 additions & 11 deletions lib/droonga/command/droonga_engine.rb
Expand Up @@ -101,6 +101,10 @@ def initialize
@daemon = nil
@pid_file_path = nil
@ready_notify_fd = nil

@listen_fd = nil
@heartbeat_fd = nil
@serf_agent_pid = nil
end

def engine_name
Expand Down Expand Up @@ -143,7 +147,28 @@ def daemon?
daemon
end

def to_command_line
def to_engine_command_line
command_line_options = [
"--host", host,
"--port", port.to_s,
"--tag", tag,
"--log-level", log_level,
]
if log_file_path
command_line_options.concat(["--log-file", log_file_path.to_s])
end
if pid_file_path
command_line_options.concat(["--pid-file", pid_file_path.to_s])
end
if daemon?
command_line_options << "--daemon"
else
command_line_options << "--no-daemon"
end
command_line_options
end

def to_service_command_line
command_line_options = [
"--engine-name", engine_name,
]
Expand All @@ -156,14 +181,19 @@ def add_command_line_options(parser)
add_process_options(parser)
add_path_options(parser)
add_notification_options(parser)
add_internal_options(parser)
end

def listen_socket
@listen_socket ||= TCPServer.new(host, port)
@listen_socket ||= create_listen_socket
end

def heartbeat_socket
@heartbeat_socket ||= bind_heartbeat_socket
@heartbeat_socket ||= create_heartbeat_socket
end

def serf_agent_pid
@serf_agent_pid
end

private
Expand Down Expand Up @@ -297,10 +327,41 @@ def add_notification_options(parser)
end
end

def bind_heartbeat_socket
socket = UDPSocket.new(address_family)
socket.bind(host, port)
socket
def add_internal_options(parser)
parser.separator("")
parser.separator("Internal:")
parser.on("--listen-fd=FD", Integer,
"FD of listen socket") do |fd|
@listen_fd = fd
end
parser.on("--heartbeat-fd=FD", Integer,
"FD of heartbeat socket") do |fd|
@heartbeat_fd = fd
end
parser.on("--serf-agent-pid=PID", Integer,
"PID of Serf agent") do |pid|
@serf_agent_pid = pid
end
end

def create_listen_socket
begin
TCPServer.new(host, port)
rescue Errno::EADDRINUSE
raise if @listen_fd.nil?
TCPServer.for_fd(@listen_fd)
end
end

def create_heartbeat_socket
begin
socket = UDPSocket.new(address_family)
socket.bind(host, port)
socket
rescue Errno::EADDRINUSE
raise if @heartbeat_fd.nil?
UDPSocket.for_fd(@heartbeat_fd)
end
end
end

Expand All @@ -311,6 +372,7 @@ def initialize(configuration)
@configuration = configuration
@loop = Coolio::Loop.default
@log_file = nil
@pid_file_path = nil
end

def run
Expand All @@ -329,14 +391,15 @@ def reopen_log_file
end

def write_pid_file
if @configuration.pid_file_path
@configuration.pid_file_path.open("w") do |file|
@pid_file_path = @configuration.pid_file_path
if @pid_file_path
@pid_file_path.open("w") do |file|
file.puts(Process.pid)
end
begin
yield
ensure
FileUtils.rm_f(@configuration.pid_file_path.to_s)
FileUtils.rm_f(@pid_file_path.to_s)
end
else
yield
Expand All @@ -347,6 +410,7 @@ def run_internal
start_serf
@service_runner = run_service
setup_initial_on_ready
@restart_observer = run_restart_observer
@catalog_observer = run_catalog_observer
@command_runner = run_command_runner

Expand Down Expand Up @@ -392,13 +456,15 @@ def trap_signals
def stop_gracefully
@command_runner.stop
@catalog_observer.stop
@restart_observer.stop
stop_serf
@service_runner.stop_gracefully
end

def stop_immediately
@command_runner.stop
@catalog_observer.stop
@restart_observer.stop
stop_serf
@service_runner.stop_immediately
end
Expand All @@ -424,6 +490,16 @@ def restart_immediately
old_service_runner.stop_immediately
end

def restart_self
old_pid_file_path = Pathname.new("#{@pid_file_path}.old")
FileUtils.mv(@pid_file_path.to_s, old_pid_file_path.to_s)
@pid_file_path = old_pid_file_path
stop_gracefully

engine_runner = EngineRunner.new(@configuration)
engine_runner.run
end

def run_service
service_runner = ServiceRunner.new(@loop, @configuration)
service_runner.run
Expand All @@ -444,6 +520,15 @@ def stop_serf
@serf_agent.stop
end

def run_restart_observer
restart_observer = FileObserver.new(@loop, Path.restart)
restart_observer.on_change = lambda do
restart_self
end
restart_observer.start
restart_observer
end

def run_catalog_observer
catalog_observer = FileObserver.new(@loop, Path.catalog)
catalog_observer.on_change = lambda do
Expand All @@ -468,6 +553,31 @@ def log_tag
end
end

class EngineRunner
def initialize(configuration)
@configuration = configuration
end

def run
listen_fd = @configuration.listen_socket.fileno
heartbeat_fd = @configuration.heartbeat_socket.fileno
env = {}
command_line = [
RbConfig.ruby,
"-S",
"droonga-engine",
"--listen-fd", listen_fd.to_s,
"--heartbeat-fd", heartbeat_fd.to_s,
*@configuration.to_engine_command_line,
]
options = {
listen_fd => listen_fd,
heartbeat_fd => heartbeat_fd,
}
spawn(env, *command_line, options)
end
end

class ServiceRunner
def initialize(raw_loop, configuration)
@raw_loop = raw_loop
Expand Down Expand Up @@ -499,7 +609,7 @@ def run
"--heartbeat-fd", heartbeat_fd.to_s,
"--control-read-fd", control_write_in.fileno.to_s,
"--control-write-fd", control_read_out.fileno.to_s,
*@configuration.to_command_line,
*@configuration.to_service_command_line,
]
options = {
listen_fd => listen_fd,
Expand Down
5 changes: 5 additions & 0 deletions lib/droonga/path.rb
Expand Up @@ -65,6 +65,11 @@ def catalog
Pathname.new(base_file_name).expand_path(base)
end

# TODO: Re-consider this approach
def restart
base + "restart.txt"
end

def buffer
state + "buffer"
end
Expand Down

0 comments on commit e2bc408

Please sign in to comment.