Navigation Menu

Skip to content

Commit

Permalink
Set effective timestamp for buffered messages via Serf's tag
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 7, 2015
1 parent 929cc0d commit fdf9a6a
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 6 deletions.
5 changes: 2 additions & 3 deletions bin/droonga-engine-absorb-data
Expand Up @@ -202,10 +202,9 @@ class AbsorbDataCommand
if timestamp and not timestamp.empty?
puts "The timestamp of the last processed message in the source node: #{timestamp}"
puts "Setting effective message timestamp for the destination node..."
response = run_remote_command(destination_node, "set_metadata",
response = run_remote_command(destination_node, "accept_messages_newer_than",
"node" => destination_node,
"key" => "effective_message_timestamp",
"value" => timestamp)
"timestamp" => timestamp)
end
end

Expand Down
5 changes: 2 additions & 3 deletions bin/droonga-engine-join
Expand Up @@ -253,10 +253,9 @@ class JoinCommand
if timestamp and not timestamp.empty?
puts "The timestamp of the last processed message in the source node: #{timestamp}"
puts "Setting effective message timestamp for the destination node..."
response = run_remote_command(joining_node, "set_metadata",
response = run_remote_command(joining_node, "accept_messages_newer_than",
"node" => joining_node,
"key" => "effective_message_timestamp",
"value" => timestamp)
"timestamp" => timestamp)
end
end

Expand Down
8 changes: 8 additions & 0 deletions lib/droonga/command/remote.rb
Expand Up @@ -124,6 +124,14 @@ def process
end
end

class AcceptMessagesNewerThan < Base
def process
log("old timestamp: #{@serf.accept_messages_newer_than_timestamp}")
@serf.accept_messages_newer_than(@params["timestamp"])
log("new timestamp: #{@serf.accept_messages_newer_than_timestamp}")
end
end

class ReportMetadata < Base
def process
metadata = NodeMetadata.new
Expand Down
2 changes: 2 additions & 0 deletions lib/droonga/command/serf_event_handler.rb
Expand Up @@ -82,6 +82,8 @@ def detect_command_class_from_custom_event(event_name)
case event_name
when "change_role"
Remote::ChangeRole
when "accept_messages_newer_than"
Remote::AcceptMessagesNewerThan
when "report_metadata"
Remote::ReportMetadata
when "set_metadata"
Expand Down
12 changes: 12 additions & 0 deletions lib/droonga/serf.rb
Expand Up @@ -177,6 +177,18 @@ def role=(new_role)
# after that you must run update_cluster_state to update the cluster information cache
end

def accept_messages_newer_than_timestamp
@node_metadata.reload
@node_metadata.get(:accept_messages_newer_than)
end

def accept_messages_newer_than(timestamp)
@node_metadata.reload
@node_metadata.set(:accept_messages_newer_than, timestamp.to_s)
set_tag("accept-messages-newer-than", timestamp.to_s)
# after that you must run update_cluster_state to update the cluster information cache
end

def cluster_id
loader = CatalogLoader.new(Path.catalog.to_s)
catalog = loader.load
Expand Down

0 comments on commit fdf9a6a

Please sign in to comment.