Navigation Menu

Skip to content

Commit

Permalink
Introduce "EngineNode" which is a proxy (or an agent) for a node
Browse files Browse the repository at this point in the history
Conflicts:
	lib/droonga/engine_state.rb
  • Loading branch information
piroor committed Jan 6, 2015
1 parent c8b2da8 commit 0b6443e
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 78 deletions.
114 changes: 42 additions & 72 deletions lib/droonga/cluster_state.rb → lib/droonga/cluster.rb
Expand Up @@ -16,10 +16,11 @@
require "droonga/loggable"
require "droonga/path"
require "droonga/file_observer"
require "droonga/engine_node"
require "droonga/node_metadata"

module Droonga
class ClusterState
class Cluster
include Loggable

attr_accessor :catalog
Expand Down Expand Up @@ -49,17 +50,25 @@ def stop_observe
end

def reload
old_state = to_hash
if @state
old_state = @state.dup
else
old_state = nil
end
clear_cache
@state = load_state_file
if to_hash == old_state
if @state == old_state
logger.info("cluster state not changed")
else
logger.info("cluster state changed")
on_change
end
end

def engine_nodes
@engine_nodes ||= create_engine_nodes
end

def all_nodes
if @catalog
@catalog.all_nodes
Expand All @@ -69,64 +78,42 @@ def all_nodes
end

def dead_nodes
if @state
@dead_nodes ||= collect_dead_nodes
else
[]
end
engine_nodes.collect(&:dead?).collect(&:name)
end

def service_provider_nodes
if @state
@service_provider_nodes ||= collect_nodes_by_role(NodeMetadata::Role::SERVICE_PROVIDER)
else
all_nodes
end
engine_nodes.collect(&:service_provider?).collect(&:name)
end

def absorb_source_nodes
if @state
@absorb_source_nodes ||= collect_nodes_by_role(NodeMetadata::Role::ABSORB_SOURCE)
else
[]
end
engine_nodes.collect(&:absorb_source?).collect(&:name)
end

def absorb_destination_nodes
if @state
@absorb_destination_nodes ||= collect_nodes_by_role(NodeMetadata::Role::ABSORB_DESTINATION)
else
[]
end
engine_nodes.collect(&:absorb_destination?).collect(&:name)
end

def same_role_nodes
case node_metadata.role
when NodeMetadata::Role::SERVICE_PROVIDER
all_nodes & service_provider_nodes
when NodeMetadata::Role::ABSORB_SOURCE
all_nodes & absorb_source_nodes
when NodeMetadata::Role::ABSORB_DESTINATION
all_nodes & absorb_destination_nodes
else
[]
engine_nodes.select do |node|
node.role == node_metadata.role
end.collect do |node|
node.name
end
end

def forwardable_nodes
same_role_nodes - dead_nodes
engine_nodes.select do |node|
node.live? and node.role == node_metadata.role
end.collect do |node|
node.name
end
end

def writable_nodes
case node_metadata.role
when NodeMetadata::Role::SERVICE_PROVIDER
all_nodes
when NodeMetadata::Role::ABSORB_SOURCE
all_nodes & absorb_source_nodes
when NodeMetadata::Role::ABSORB_DESTINATION
all_nodes & absorb_destination_nodes
else
[]
engine_nodes.select do |node|
node.writable_by?(node_metadata.role)
end.collect do |node|
node.name
end
end

Expand All @@ -147,19 +134,8 @@ def on_change
end

private
def to_hash
return nil unless @state

{
:all => @state.keys,
:dead => dead_nodes,
:service_provider => service_provider_nodes,
:absorb_source => absorb_source_nodes,
:absorb_destination => absorb_destination_nodes,
}
end

def clear_cache
@engine_nodes = nil
@dead_nodes = nil
@service_provider_nodes = nil
@absorb_source_nodes = nil
Expand All @@ -181,29 +157,23 @@ def load_state_file
end
end

def default_state
{}
def all_node_names
if @catalog
@catalog.all_nodes
else
[]
end
end

def collect_dead_nodes
nodes = []
@state.each do |name, node_state|
unless node_state["live"]
nodes << name
end
def create_engine_nodes
all_node_names.collect do |name|
node_state = @state[name] || {}
EngineNode.new(name, node_state)
end
nodes.sort
end

def collect_nodes_by_role(role)
nodes = []
@state.each do |name, node_state|
if node_state["type"] == "engine" and
node_state["role"] == role
nodes << name
end
end
nodes.sort
def default_state
{}
end

def node_metadata
Expand Down
72 changes: 72 additions & 0 deletions lib/droonga/engine_node.rb
@@ -0,0 +1,72 @@
# Copyright (C) 2015 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "droonga/node_metadata"

module Droonga
class EngineNode
attr_reader :name

def initialize(name, state)
@name = name
@state = state
end

def live?
@state.nil? or @state["live"]
end

def dead?
not live?
end

def service_provider?
role == NodeMetadata::Role::SERVICE_PROVIDER
end

def absorb_source?
role == NodeMetadata::Role::ABSORB_SOURCE
end

def absorb_destination?
role == NodeMetadata::Role::ABSORB_DESTINATION
end

def role
if @state
@state["role"]
else
NodeMetadata::Role::SERVICE_PROVIDER
end
end

def forwardable?
not dead?
end

def writable_by?(sender_role)
case sender_role
when NodeMetadata::Role::SERVICE_PROVIDER
true
when NodeMetadata::Role::ABSORB_SOURCE
absorb_source?
when NodeMetadata::Role::ABSORB_DESTINATION
absorb_destination?
else
false
end
end
end
end
7 changes: 3 additions & 4 deletions lib/droonga/engine_state.rb
Expand Up @@ -21,7 +21,7 @@
require "droonga/event_loop"
require "droonga/buffered_forwarder"
require "droonga/replier"
require "droonga/cluster_state"
require "droonga/cluster"

module Droonga
class EngineState
Expand All @@ -43,9 +43,8 @@ def initialize(loop, name, internal_name)
@internal_name = internal_name
@sessions = {}
@current_id = 0
@cluster = ClusterState.new(@loop)
@forwarder = BufferedForwarder.new(@loop,
:cluster_state => @cluster)
@cluster = Cluster.new(@loop)
@forwarder = Forwarder.new(@loop, :buffering => true)
@cluster.on_change = lambda do
@forwarder.resume
end
Expand Down
4 changes: 2 additions & 2 deletions test/unit/plugins/system/test_status.rb
Expand Up @@ -47,11 +47,11 @@ def process(request)

class StubEngineState
def cluster
@cluster ||= StubClusterState.new
@cluster ||= StubCluster.new
end
end

class StubClusterState
class StubCluster
def all_nodes
[
"127.0.0.1:10031/droonga",
Expand Down

0 comments on commit 0b6443e

Please sign in to comment.