Skip to content

Commit

Permalink
Extract cluster-specific features of EngineState to ClusterState
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Dec 25, 2014
1 parent 5dbed49 commit 2125ecd
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 89 deletions.
4 changes: 2 additions & 2 deletions lib/droonga/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ def dispatch_steps(steps)
dataset = @catalog.dataset(step["dataset"])
if dataset
if write_step?(step)
target_nodes = @engine_state.writable_nodes
target_nodes = @engine_state.cluster.writable_nodes
else
target_nodes = @engine_state.forwardable_nodes
target_nodes = @engine_state.cluster.forwardable_nodes
end
routes = dataset.compute_routes(step, target_nodes)
step["routes"] = routes
Expand Down
95 changes: 12 additions & 83 deletions lib/droonga/engine_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
require "droonga/event_loop"
require "droonga/forwarder"
require "droonga/replier"
require "droonga/node_metadata"
require "droonga/cluster_state"

module Droonga
class EngineState
Expand All @@ -33,20 +33,26 @@ class EngineState
attr_reader :forwarder
attr_reader :replier
attr_writer :on_ready
attr_reader :catalog
attr_reader :cluster
attr_accessor :on_finish
attr_accessor :catalog

def initialize(loop, name, internal_name)
@loop = loop
@name = name
@internal_name = internal_name
@sessions = {}
@current_id = 0
@cluster = ClusterState.new
@forwarder = Forwarder.new(@loop, :buffering => true)
@replier = Replier.new(@forwarder)
@on_ready = nil
@on_finish = nil
@catalog = nil
@live_nodes_list = nil
end

def catalog=(catalog)
@catalog = @cluster.catalog = catalog
end

def start
Expand Down Expand Up @@ -105,95 +111,18 @@ def have_session?
not @sessions.empty?
end

def all_nodes
@catalog.all_nodes
end

def dead_nodes
if @live_nodes_list
@live_nodes_list.dead_nodes
else
[]
end
end

def service_provider_nodes
if @live_nodes_list
@live_nodes_list.service_provider_nodes
else
all_nodes
end
end

def absorb_source_nodes
if @live_nodes_list
@live_nodes_list.absorb_source_nodes
else
all_nodes
end
end

def absorb_destination_nodes
if @live_nodes_list
@live_nodes_list.absorb_destination_nodes
else
all_nodes
end
end

def forwardable_nodes
same_role_nodes = nil
case node_metadata.role
when NodeMetadata::Role::SERVICE_PROVIDER
same_role_nodes = all_nodes & service_provider_nodes
when NodeMetadata::Role::ABSORB_SOURCE
same_role_nodes = all_nodes & absorb_source_nodes
when NodeMetadata::Role::ABSORB_DESTINATION
same_role_nodes = all_nodes & absorb_destination_nodes
else
same_role_nodes = []
end
same_role_nodes - dead_nodes
end

def writable_nodes
case node_metadata.role
when NodeMetadata::Role::SERVICE_PROVIDER
all_nodes
when NodeMetadata::Role::ABSORB_SOURCE
all_nodes & absorb_source_nodes
when NodeMetadata::Role::ABSORB_DESTINATION
all_nodes & absorb_destination_nodes
else
[]
end
end

def live_nodes_list=(new_nodes_list)
old_live_nodes_list = @live_nodes_list
@live_nodes_list = new_nodes_list
unless old_live_nodes_list == new_nodes_list
@forwarder.resume
end
@live_nodes_list
def on_ready
@on_ready.call if @on_ready
end

def select_responsive_routes(routes)
selected_nodes = forwardable_nodes
selected_nodes = @cluster.forwardable_nodes
routes.select do |route|
selected_nodes.include?(farm_path(route))
end
end

def on_ready
@on_ready.call if @on_ready
end

private
def node_metadata
@node_metadata ||= NodeMetadata.new
end

def log_tag
"engine_state"
end
Expand Down
8 changes: 4 additions & 4 deletions lib/droonga/plugins/system.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ class StatusHandler < Droonga::Handler
action.synchronous = true

def handle(message)
engine_state = @messenger.engine_state
active_nodes = engine_state.forwardable_nodes
dead_nodes = engine_state.dead_nodes
cluster = @messenger.engine_state.cluster
active_nodes = cluster.forwardable_nodes
dead_nodes = cluster.dead_nodes
nodes = {}
engine_state.all_nodes.collect do |identifier|
cluster.all_nodes.collect do |identifier|
if active_nodes.include?(identifier)
status = "active"
elsif dead_nodes.include?(identifier)
Expand Down
5 changes: 5 additions & 0 deletions test/unit/plugins/system/test_status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def process(request)
end

class StubEngineState
def cluster
@cluster ||= StubClusterState.new
end

class StubClusterState
def all_nodes
[
"127.0.0.1:10031/droonga",
Expand Down

0 comments on commit 2125ecd

Please sign in to comment.