Navigation Menu

Skip to content

Commit

Permalink
Shorten
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 23, 2015
1 parent 43e011b commit 830ab2d
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 43 deletions.
4 changes: 2 additions & 2 deletions lib/droonga/command/serf_event_handler.rb
Expand Up @@ -82,8 +82,8 @@ def detect_command_class_from_custom_event(event_name)
case event_name
when "change_role"
Serf::RemoteCommand::ChangeRole
when "export_last_processed_message_timestamp"
Serf::RemoteCommand::ExportLastProcessedMessageTimestamp
when "export_last_message_timestamp"
Serf::RemoteCommand::ExportLastMessageTimestamp
when "accept_messages_newer_than"
Serf::RemoteCommand::AcceptMessagesNewerThan
when "join"
Expand Down
58 changes: 29 additions & 29 deletions lib/droonga/engine.rb
Expand Up @@ -71,7 +71,7 @@ def start
@state.start
@cluster.start
@dispatcher.start
@last_processed_message_timestamp_observer = run_last_processed_message_timestamp_observer
@last_message_timestamp_observer = run_last_message_timestamp_observer
logger.trace("start: done")
end

Expand All @@ -80,14 +80,14 @@ def stop_gracefully
@cluster.shutdown
on_finish = lambda do
logger.trace("stop_gracefully: middle")
@last_processed_message_timestamp_observer.stop
@last_message_timestamp_observer.stop
@dispatcher.stop_gracefully do
@state.shutdown
yield
#XXX We must save last processed message timstamp
# based on forwarded/dispatched messages while
# "graceful stop" operations.
export_last_processed_message_timestamp_to_file
export_last_message_timestamp_to_file
logger.trace("stop_gracefully: done")
end
end
Expand All @@ -103,11 +103,11 @@ def stop_gracefully
# It may be called after stop_gracefully.
def stop_immediately
logger.trace("stop_immediately: start")
@last_processed_message_timestamp_observer.stop
@last_message_timestamp_observer.stop
@dispatcher.stop_immediately
@cluster.shutdown
@state.shutdown
export_last_processed_message_timestamp_to_file
export_last_message_timestamp_to_file
logger.trace("stop_immediately: done")
end

Expand All @@ -119,9 +119,9 @@ def refresh_self_reference
def process(message)
if message.include?("date")
date = Time.parse(message["date"])
if @last_processed_message_timestamp.nil? or
@last_processed_message_timestamp < date
@last_processed_message_timestamp = date
if @last_message_timestamp.nil? or
@last_message_timestamp < date
@last_message_timestamp = date
end
end
@dispatcher.process_message(message)
Expand All @@ -144,56 +144,56 @@ def create_dispatcher

MICRO_SECONDS_DECIMAL_PLACE = 6

def export_last_processed_message_timestamp_to_cluster
logger.trace("export_last_processed_message_timestamp_to_cluster: start")
if @last_processed_message_timestamp
timestamp = @last_processed_message_timestamp
def export_last_message_timestamp_to_cluster
logger.trace("export_last_message_timestamp_to_cluster: start")
if @last_message_timestamp
timestamp = @last_message_timestamp
serf = Serf.new(@name)
old_timestamp = serf.last_processed_message_timestamp
old_timestamp = serf.last_message_timestamp
old_timestamp = Time.parse(old_timestamp) if old_timestamp
if old_timestamp.nil? or timestamp > old_timestamp
timestamp = timestamp.utc.iso8601(MICRO_SECONDS_DECIMAL_PLACE)
serf.last_processed_message_timestamp = timestamp
serf.last_message_timestamp = timestamp
logger.info("exported last processed message timestamp",
:timestamp => timestamp)
end
end
logger.trace("export_last_processed_message_timestamp_to_cluster: done")
logger.trace("export_last_message_timestamp_to_cluster: done")
end

def export_last_processed_message_timestamp_to_file
old_timestamp = read_last_processed_message_timestamp
def export_last_message_timestamp_to_file
old_timestamp = read_last_message_timestamp
if old_timestamp and
old_timestamp > @last_processed_message_timestamp
old_timestamp > @last_message_timestamp
return
end
path = Path.last_processed_message_timestamp
path = Path.last_message_timestamp
SafeFileWriter.write(path) do |output, file|
timestamp = @last_processed_message_timestamp
timestamp = @last_message_timestamp
timestamp = timestamp.utc.iso8601(MICRO_SECONDS_DECIMAL_PLACE)
output.puts(timestamp)
end
end

def run_last_processed_message_timestamp_observer
path = Path.last_processed_message_timestamp
def run_last_message_timestamp_observer
path = Path.last_message_timestamp
observer = FileObserver.new(@loop, path)
observer.on_change = lambda do
timestamp = read_last_processed_message_timestamp
timestamp = read_last_message_timestamp
if timestamp
if @last_processed_message_timestamp.nil? or
timestamp > @last_processed_message_timestamp
@last_processed_message_timestamp = timestamp
if @last_message_timestamp.nil? or
timestamp > @last_message_timestamp
@last_message_timestamp = timestamp
end
end
export_last_processed_message_timestamp_to_cluster
export_last_message_timestamp_to_cluster
end
observer.start
observer
end

def read_last_processed_message_timestamp
file = Path.last_processed_message_timestamp
def read_last_message_timestamp
file = Path.last_message_timestamp
return nil unless file.exist?
timestamp = file.read
return nil if timestamp.nil? or timestamp.empty?
Expand Down
4 changes: 2 additions & 2 deletions lib/droonga/path.rb
Expand Up @@ -61,8 +61,8 @@ def catalog
Pathname.new(base_file_name).expand_path(base)
end

def last_processed_message_timestamp
base + "last-processed-message-timestamp.txt"
def last_message_timestamp
base + "last-message-timestamp.txt"
end

def accidental_buffer
Expand Down
14 changes: 7 additions & 7 deletions lib/droonga/serf.rb
Expand Up @@ -195,18 +195,18 @@ def role=(new_role)
role
end

def last_processed_message_timestamp
get_tag(Tag.last_processed_message_timestamp)
def last_message_timestamp
get_tag(Tag.last_message_timestamp)
end

def latest_last_processed_message_timestamp
send_query("export_last_processed_message_timestamp",
def latest_last_message_timestamp
send_query("export_last_message_timestamp",
"node" => @name.to_s)
last_processed_message_timestamp
last_message_timestamp
end

def last_processed_message_timestamp=(timestamp)
set_tag(Tag.last_processed_message_timestamp, timestamp)
def last_message_timestamp=(timestamp)
set_tag(Tag.last_message_timestamp, timestamp)
# after that you must run update_cluster_state to update the cluster information cache
end

Expand Down
4 changes: 2 additions & 2 deletions lib/droonga/serf/remote_command.rb
Expand Up @@ -121,9 +121,9 @@ def process
end
end

class ExportLastProcessedMessageTimestamp < Base
class ExportLastMessageTimestamp < Base
def process
FileUtils.touch(Path.last_processed_message_timestamp.to_s)
FileUtils.touch(Path.last_message_timestamp.to_s)
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/droonga/serf/tag.rb
Expand Up @@ -37,7 +37,7 @@ def accept_messages_newer_than
"accept-newer-than"
end

def last_processed_message_timestamp
def last_message_timestamp
"last-timestamp"
end

Expand Down

0 comments on commit 830ab2d

Please sign in to comment.