Navigation Menu

Skip to content

Commit

Permalink
Define serf tags in a place
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 21, 2015
1 parent 01c57e6 commit 52872d1
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 26 deletions.
47 changes: 21 additions & 26 deletions lib/droonga/serf.rb
Expand Up @@ -20,6 +20,7 @@
require "droonga/catalog/loader"
require "droonga/node_name"
require "droonga/node_role"
require "droonga/serf/tag"
require "droonga/serf/downloader"
require "droonga/serf/agent"
require "droonga/serf/command"
Expand Down Expand Up @@ -61,9 +62,9 @@ def run_agent(loop)
end

def initialize_tags
set_tag("type", "engine")
set_tag("cluster_id", cluster_id)
set_tag("role", role)
set_tag(Tag.node_type, "engine")
set_tag(Tag.node_role, role)
set_tag(Tag.cluster_id, cluster_id)
end

def leave
Expand Down Expand Up @@ -122,20 +123,20 @@ def current_cluster_state
nodes = {}
unprocessed_messages_existence = {}
current_members.each do |member|
foreign = member["tags"]["cluster_id"] != current_cluster_id
foreign = member["tags"][Tag.cluster_id] != current_cluster_id
next if foreign

member["tags"].each do |key, value|
next unless key.start_with?(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX)
node_name = key.sub(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX, "")
next unless Tag.have_unprocessed_messages_tag?(key)
node_name = Tag.extract_node_name_from_have_unprocessed_messages_tag(key)
next if unprocessed_messages_existence[node_name]
unprocessed_messages_existence[node_name] = value == "true"
end

nodes[member["name"]] = {
"type" => member["tags"]["type"],
"role" => member["tags"]["role"],
"accept_messages_newer_than" => member["tags"]["accept-newer-than"],
"type" => member["tags"][Tag.node_type],
"role" => member["tags"][Tag.node_role],
"accept_messages_newer_than" => member["tags"][Tag.accept_messages_newer_than],
"live" => member["status"] == "alive",
}
end
Expand Down Expand Up @@ -171,44 +172,44 @@ def delete_tag(name)
end

def update_cluster_id
set_tag("cluster_id", cluster_id)
set_tag(Tag.cluster_id, cluster_id)
end

def set_have_unprocessed_messages_for(node_name)
tag = have_unprocessed_messages_tag_for(node_name)
tag = Tag.have_unprocessed_messages_tag_for(node_name)
set_tag(tag, true) unless @tags_cache.key?(tag)
end

def reset_have_unprocessed_messages_for(node_name)
delete_tag(have_unprocessed_messages_tag_for(node_name))
delete_tag(Tag.have_unprocessed_messages_tag_for(node_name))
end

def role
NodeRole.normalize(get_tag("role"))
NodeRole.normalize(get_tag(Tag.node_role))
end

def role=(new_role)
role = NodeRole.normalize(new_role)
set_tag("role", role)
set_tag(Tag.node_role, role)
# after that you must run update_cluster_state to update the cluster information cache
role
end

def last_processed_message_timestamp
get_tag("last-timestamp")
get_tag(Tag.last_processed_message_timestamp)
end

def last_processed_message_timestamp=(timestamp)
set_tag("last-timestamp", timestamp.to_s)
set_tag(Tag.last_processed_message_timestamp, timestamp.to_s)
# after that you must run update_cluster_state to update the cluster information cache
end

def accept_messages_newer_than_timestamp
get_tag("accept-newer-than")
get_tag(Tag.accept_messages_newer_than)
end

def accept_messages_newer_than(timestamp)
set_tag("accept-newer-than", timestamp.to_s)
set_tag(Tag.accept_messages_newer_than, timestamp.to_s)
# after that you must run update_cluster_state to update the cluster information cache
end

Expand All @@ -223,13 +224,13 @@ def cluster_id

def ensure_restarted(&block)
start_time = Time.now
previous_internal_name = get_tag("internal-name")
previous_internal_name = get_tag(Tag.internal_node_name)
restarted = false

yield # the given operation must restart the service.

while Time.now - start_time < CHECK_RESTARTED_TIMEOUT
restarted = get_tag("internal-name") == previous_internal_name
restarted = get_tag(Tag.internal_node_name) == previous_internal_name
break if restarted
sleep(CHECK_RESTARTED_INTERVAL)
end
Expand Down Expand Up @@ -302,12 +303,6 @@ def detect_other_hosts
end
end

HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX = "buffered-for-"

def have_unprocessed_messages_tag_for(node_name)
"#{HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX}#{node_name}"
end

def log_tag
"serf"
end
Expand Down
60 changes: 60 additions & 0 deletions lib/droonga/serf/tag.rb
@@ -0,0 +1,60 @@
# Copyright (C) 2015 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

module Droonga
class Serf
class Tag
class << self
def node_type
"type"
end

def node_role
"role"
end

def internal_node_name
"internal-name"
end

def cluster_id
"cluster_id"
end

def accept_messages_newer_than
"accept-newer-than"
end

def last_processed_message_timestamp
"last-timestamp"
end

HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX = "buffered-for-"

def have_unprocessed_messages_tag_for(node_name)
"#{HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX}#{node_name}"
end

def have_unprocessed_messages_tag?(tag)
tag.start_with?(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX)
end

def extract_node_name_from_have_unprocessed_messages_tag(tag)
tag.sub(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX, "")
end
end
end
end
end

0 comments on commit 52872d1

Please sign in to comment.