Navigation Menu

Skip to content

Commit

Permalink
Get last processed message timestamp via serf tag
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 21, 2015
1 parent 028129a commit df8c367
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
18 changes: 13 additions & 5 deletions bin/droonga-engine-absorb-data
Expand Up @@ -187,6 +187,13 @@ module Droonga
serf.send_query(command, options)
end

def current_member_state(target)
serf = Serf.new(target, :verbose => @options[:verbose])
serf.current_members.find do |member|
member["name"] == target
end
end

def absorber
@absorber ||= prepare_absorber
end
Expand Down Expand Up @@ -234,15 +241,16 @@ module Droonga

puts ""

response = run_remote_command(source_node.to_s, "report_metadata",
"node" => source_node.to_s,
"key" => "last_processed_message_timestamp")
unless response
timestamp = nil
source_state = current_member_state(source_node.to_s)
if source_state
timestamp = source_state["tags"]["last-processed-message-timestamp"]
end
unless timestamp
$stderr.puts("Couldn't get the time stamp of " +
"the last processed message from the source node.")
return false
end
timestamp = response["value"]
if timestamp and not timestamp.empty?
puts "The timestamp of the last processed message in the source node: #{timestamp}"
puts "Setting effective message timestamp for the destination node..."
Expand Down
18 changes: 13 additions & 5 deletions bin/droonga-engine-join
Expand Up @@ -184,6 +184,13 @@ module Droonga
serf.send_query(command, options)
end

def current_member_state(target)
serf = Serf.new(target, :verbose => @options[:verbose])
serf.current_members.find do |member|
member["name"] == target
end
end

def absorber
@absorber ||= prepare_absorber
end
Expand Down Expand Up @@ -286,15 +293,16 @@ module Droonga
end

def set_effective_message_timestamp
response = run_remote_command(source_node.to_s, "report_metadata",
"node" => source_node.to_s,
"key" => "last_processed_message_timestamp")
unless response
timestamp = nil
source_state = current_member_state(source_node.to_s)
if source_state
timestamp = source_state["tags"]["last-processed-message-timestamp"]
end
unless timestamp
$stderr.puts("Couldn't get the time stamp of " +
"the last processed message from the source node.")
return false
end
timestamp = response["value"]
if timestamp and not timestamp.empty?
puts "The timestamp of the last processed message in the source node: #{timestamp}"
puts "Setting effective message timestamp for the destination node..."
Expand Down

0 comments on commit df8c367

Please sign in to comment.