Navigation Menu

Skip to content

Commit

Permalink
Add remote command to export last processed message timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 23, 2015
1 parent 05be936 commit 6d4053f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 8 deletions.
2 changes: 2 additions & 0 deletions lib/droonga/command/serf_event_handler.rb
Expand Up @@ -82,6 +82,8 @@ def detect_command_class_from_custom_event(event_name)
case event_name
when "change_role"
Serf::RemoteCommand::ChangeRole
when "export_last_processed_message_timestamp"
Serf::RemoteCommand::ExportLastProcessedMessageTimestamp
when "accept_messages_newer_than"
Serf::RemoteCommand::AcceptMessagesNewerThan
when "join"
Expand Down
32 changes: 24 additions & 8 deletions lib/droonga/engine.rb
Expand Up @@ -27,6 +27,7 @@
require "droonga/dispatcher"
require "droonga/serf"
require "droonga/serf/tag"
require "droonga/file_observer"

module Droonga
class Engine
Expand Down Expand Up @@ -54,6 +55,8 @@ def initialize(loop, name, internal_name, options={})
@cluster.on_change = lambda do
@dispatcher.refresh_node_reference
end

@export_last_processed_message_timestamp_observer = run_export_last_processed_message_timestamp_observer
end

def start
Expand All @@ -69,6 +72,7 @@ def start
@state.start
@cluster.start
@dispatcher.start
@export_last_processed_message_timestamp_observer.start
logger.trace("start: done")
end

Expand All @@ -79,7 +83,7 @@ def stop_gracefully
logger.trace("stop_gracefully/on_finish: start")
@dispatcher.stop_gracefully do
@state.shutdown
save_last_processed_message_timestamp
@export_last_processed_message_timestamp_observer.shutdown
yield
end
logger.trace("stop_gracefully/on_finish: done")
Expand All @@ -100,7 +104,7 @@ def stop_immediately
@dispatcher.stop_immediately
@cluster.shutdown
@state.shutdown
save_last_processed_message_timestamp
@export_last_processed_message_timestamp_observer.shutdown
logger.trace("stop_immediately: done")
end

Expand Down Expand Up @@ -137,17 +141,29 @@ def create_dispatcher

MICRO_SECONDS_DECIMAL_PLACE = 6

def save_last_processed_message_timestamp
logger.trace("save_last_processed_message_timestamp: start")
def export_last_processed_message_timestamp
logger.trace("export_last_processed_message_timestamp: start")
if @last_processed_message_timestamp
timestamp = @last_processed_message_timestamp
timestamp = timestamp.utc.iso8601(MICRO_SECONDS_DECIMAL_PLACE)
serf = Serf.new(@name)
serf.last_processed_message_timestamp = timestamp
logger.info("saved last processed message timestamp",
:timestamp => timestamp)
old_timestamp = serf.last_processed_message_timestamp
if timestamp > old_timestamp
serf.last_processed_message_timestamp = timestamp
logger.info("exported last processed message timestamp",
:timestamp => timestamp)
end
end
logger.trace("export_last_processed_message_timestamp: done")
end

def run_export_last_processed_message_timestamp_observer
observer = FileObserver.new(@loop, Path.export_last_processed_message_timestamp)
observer.on_change = lambda do
export_last_processed_message_timestamp
end
logger.trace("save_last_processed_message_timestamp: done")
observer.start
observer
end

def log_tag
Expand Down
4 changes: 4 additions & 0 deletions lib/droonga/path.rb
Expand Up @@ -61,6 +61,10 @@ def catalog
Pathname.new(base_file_name).expand_path(base)
end

def export_last_processed_message_timestamp
base + "export-last-processed-message-timestamp.txt"
end

def accidental_buffer
state + "buffer" + "accidental"
end
Expand Down
6 changes: 6 additions & 0 deletions lib/droonga/serf/remote_command.rb
Expand Up @@ -121,6 +121,12 @@ def process
end
end

class ExportLastProcessedMessageTimestamp < Base
def process
FileUtils.touch(Path.export_last_processed_message_timestamp.to_s)
end
end

class AcceptMessagesNewerThan < Base
def process
log("old timestamp: #{@serf.accept_messages_newer_than_timestamp}")
Expand Down

0 comments on commit 6d4053f

Please sign in to comment.