Navigation Menu

Skip to content

Commit

Permalink
Export gracefully stopping process's last processed timestamp to clus…
Browse files Browse the repository at this point in the history
…ter via file, not directly via serf
  • Loading branch information
piroor committed Apr 23, 2015
1 parent c530784 commit 43e011b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 14 deletions.
53 changes: 42 additions & 11 deletions lib/droonga/engine.rb
Expand Up @@ -28,6 +28,7 @@
require "droonga/serf"
require "droonga/serf/tag"
require "droonga/file_observer"
require "droonga/safe_file_writer"

module Droonga
class Engine
Expand Down Expand Up @@ -70,7 +71,7 @@ def start
@state.start
@cluster.start
@dispatcher.start
@export_last_processed_message_timestamp_observer = run_export_last_processed_message_timestamp_observer
@last_processed_message_timestamp_observer = run_last_processed_message_timestamp_observer
logger.trace("start: done")
end

Expand All @@ -79,14 +80,14 @@ def stop_gracefully
@cluster.shutdown
on_finish = lambda do
logger.trace("stop_gracefully: middle")
@export_last_processed_message_timestamp_observer.stop
@last_processed_message_timestamp_observer.stop
@dispatcher.stop_gracefully do
@state.shutdown
yield
#XXX We must save last processed message timstamp
# based on forwarded/dispatched messages while
# "graceful stop" operations.
export_last_processed_message_timestamp
export_last_processed_message_timestamp_to_file
logger.trace("stop_gracefully: done")
end
end
Expand All @@ -102,11 +103,11 @@ def stop_gracefully
# It may be called after stop_gracefully.
def stop_immediately
logger.trace("stop_immediately: start")
@export_last_processed_message_timestamp_observer.stop
@last_processed_message_timestamp_observer.stop
@dispatcher.stop_immediately
@cluster.shutdown
@state.shutdown
export_last_processed_message_timestamp
export_last_processed_message_timestamp_to_file
logger.trace("stop_immediately: done")
end

Expand Down Expand Up @@ -143,8 +144,8 @@ def create_dispatcher

MICRO_SECONDS_DECIMAL_PLACE = 6

def export_last_processed_message_timestamp
logger.trace("export_last_processed_message_timestamp: start")
def export_last_processed_message_timestamp_to_cluster
logger.trace("export_last_processed_message_timestamp_to_cluster: start")
if @last_processed_message_timestamp
timestamp = @last_processed_message_timestamp
serf = Serf.new(@name)
Expand All @@ -157,18 +158,48 @@ def export_last_processed_message_timestamp
:timestamp => timestamp)
end
end
logger.trace("export_last_processed_message_timestamp: done")
logger.trace("export_last_processed_message_timestamp_to_cluster: done")
end

def run_export_last_processed_message_timestamp_observer
observer = FileObserver.new(@loop, Path.export_last_processed_message_timestamp)
def export_last_processed_message_timestamp_to_file
old_timestamp = read_last_processed_message_timestamp
if old_timestamp and
old_timestamp > @last_processed_message_timestamp
return
end
path = Path.last_processed_message_timestamp
SafeFileWriter.write(path) do |output, file|
timestamp = @last_processed_message_timestamp
timestamp = timestamp.utc.iso8601(MICRO_SECONDS_DECIMAL_PLACE)
output.puts(timestamp)
end
end

def run_last_processed_message_timestamp_observer
path = Path.last_processed_message_timestamp
observer = FileObserver.new(@loop, path)
observer.on_change = lambda do
export_last_processed_message_timestamp
timestamp = read_last_processed_message_timestamp
if timestamp
if @last_processed_message_timestamp.nil? or
timestamp > @last_processed_message_timestamp
@last_processed_message_timestamp = timestamp
end
end
export_last_processed_message_timestamp_to_cluster
end
observer.start
observer
end

def read_last_processed_message_timestamp
file = Path.last_processed_message_timestamp
return nil unless file.exist?
timestamp = file.read
return nil if timestamp.nil? or timestamp.empty?
Time.parse(timestamp)
end

def log_tag
"engine"
end
Expand Down
4 changes: 2 additions & 2 deletions lib/droonga/path.rb
Expand Up @@ -61,8 +61,8 @@ def catalog
Pathname.new(base_file_name).expand_path(base)
end

def export_last_processed_message_timestamp
base + "export-last-processed-message-timestamp.txt"
def last_processed_message_timestamp
base + "last-processed-message-timestamp.txt"
end

def accidental_buffer
Expand Down
2 changes: 1 addition & 1 deletion lib/droonga/serf/remote_command.rb
Expand Up @@ -123,7 +123,7 @@ def process

class ExportLastProcessedMessageTimestamp < Base
def process
FileUtils.touch(Path.export_last_processed_message_timestamp.to_s)
FileUtils.touch(Path.last_processed_message_timestamp.to_s)
end
end

Expand Down

0 comments on commit 43e011b

Please sign in to comment.