Navigation Menu

Skip to content

Commit

Permalink
Make EngineState independent from Cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jan 6, 2015
1 parent 2635bd7 commit 08f0dfd
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 33 deletions.
31 changes: 22 additions & 9 deletions lib/droonga/dispatcher.rb
Expand Up @@ -46,16 +46,21 @@ def initialize(type, dataset)
end
end

attr_reader :engine_state
attr_reader :engine_state, :cluster

def initialize(engine_state, catalog)
def initialize(engine_state, cluster, catalog)
@engine_state = engine_state
@cluster = cluster
@forwarder = @engine_state.forwarder
@cluster.on_change = lambda do
@forwarder.resume
end
@replier = @engine_state.replier
@catalog = catalog
@adapter_runners = create_adapter_runners
@farm = Farm.new(@engine_state.name, @catalog, @engine_state.loop,
:engine_state => @engine_state,
:cluster => @cluster,
:dispatcher => self,
:forwarder => @forwarder)
@collector_runners = create_collector_runners
Expand Down Expand Up @@ -116,7 +121,7 @@ def process_message(message)
def forward(message, destination)
logger.trace("forward start")
unless local?(destination)
return if @engine_state.cluster.forward(message, destination)
return if @cluster.forward(message, destination)
end
@forwarder.forward(message, destination)
logger.trace("forward done")
Expand Down Expand Up @@ -163,7 +168,7 @@ def process_internal_message(message)
else
steps = message["steps"]
if steps
session_planner = SessionPlanner.new(@engine_state, steps)
session_planner = SessionPlanner.new(@engine_state, @cluster, steps)
dataset = message["dataset"] || @message["dataset"]
collector_runner = @collector_runners[dataset]
session = session_planner.create_session(id, self, collector_runner)
Expand All @@ -187,7 +192,7 @@ def dispatch(message, destination)
"type" => "dispatcher",
"to" => destination,
}
@engine_state.cluster.forward(forward_message, forward_destination) ||
@cluster.forward(forward_message, forward_destination) ||
@forwarder.forward(forward_message, forward_destination)
end
end
Expand All @@ -200,9 +205,9 @@ def dispatch_steps(steps)
dataset = @catalog.dataset(step["dataset"])
if dataset
if write_step?(step)
target_nodes = @engine_state.cluster.writable_nodes
target_nodes = @cluster.writable_nodes
else
target_nodes = @engine_state.cluster.forwardable_nodes
target_nodes = @cluster.forwardable_nodes
end
routes = dataset.compute_routes(step, target_nodes)
step["routes"] = routes
Expand Down Expand Up @@ -314,8 +319,9 @@ def log_tag
class SessionPlanner
attr_reader :steps

def initialize(engine_state, steps)
def initialize(engine_state, cluster, steps)
@engine_state = engine_state
@cluster = cluster
@steps = steps
end

Expand Down Expand Up @@ -357,14 +363,21 @@ def resolve_descendants
(step["outputs"] || []).each do |output|
descendants[output] = []
@descendants[output].each do |index|
responsive_routes = @engine_state.select_responsive_routes(step["routes"])
responsive_routes = select_responsive_routes(step["routes"])
@steps[index]["n_of_expects"] += responsive_routes.size
descendants[output].concat(@steps[index]["routes"])
end
end
step["descendants"] = descendants
end
end

def select_responsive_routes(routes)
selected_nodes = @cluster.forwardable_nodes
routes.select do |route|
selected_nodes.include?(@engine_state.farm_path(route))
end
end
end
end
end
12 changes: 7 additions & 5 deletions lib/droonga/engine.rb
Expand Up @@ -20,6 +20,7 @@
require "droonga/engine/version"
require "droonga/loggable"
require "droonga/engine_state"
require "droonga/cluster"
require "droonga/catalog_loader"
require "droonga/dispatcher"
require "droonga/file_observer"
Expand All @@ -32,8 +33,9 @@ class Engine
attr_writer :on_ready
def initialize(loop, name, internal_name)
@state = EngineState.new(loop, name, internal_name)
@cluster = Cluster.new(loop)
@catalog = load_catalog
@state.catalog = @catalog
@state.catalog = @cluster.catalog = @catalog
@dispatcher = create_dispatcher
@node_metadata_observer = FileObserver.new(loop, Path.node_metadata)
@node_metadata_observer.on_change = lambda do
Expand All @@ -50,15 +52,15 @@ def start
@on_ready.call if @on_ready
end
@state.start
@state.cluster.start_observe
@cluster.start_observe
@node_metadata_observer.start
@dispatcher.start
logger.trace("start: done")
end

def stop_gracefully
logger.trace("stop_gracefully: start")
@state.cluster.stop_observe
@cluster.stop_observe
@node_metadata_observer.stop
on_finish = lambda do
logger.trace("stop_gracefully/on_finish: start")
Expand All @@ -83,7 +85,7 @@ def stop_gracefully
def stop_immediately
logger.trace("stop_immediately: start")
save_last_processed_message_timestamp
@state.cluster.stop_observe
@cluster.stop_observe
@node_metadata_observer.stop
@dispatcher.stop_immediately
@state.shutdown
Expand Down Expand Up @@ -112,7 +114,7 @@ def load_catalog
end

def create_dispatcher
Dispatcher.new(@state, @catalog)
Dispatcher.new(@state, @cluster, @catalog)
end

def save_last_processed_message_timestamp
Expand Down
19 changes: 1 addition & 18 deletions lib/droonga/engine_state.rb
Expand Up @@ -21,7 +21,6 @@
require "droonga/event_loop"
require "droonga/forwarder"
require "droonga/replier"
require "droonga/cluster"

module Droonga
class EngineState
Expand All @@ -33,8 +32,7 @@ class EngineState
attr_reader :forwarder
attr_reader :replier
attr_writer :on_ready
attr_reader :catalog
attr_reader :cluster
attr_accessor :catalog
attr_accessor :on_finish

def initialize(loop, name, internal_name)
Expand All @@ -43,21 +41,13 @@ def initialize(loop, name, internal_name)
@internal_name = internal_name
@sessions = {}
@current_id = 0
@cluster = Cluster.new(@loop)
@forwarder = Forwarder.new(@loop, :buffering => true)
@cluster.on_change = lambda do
@forwarder.resume
end
@replier = Replier.new(@forwarder)
@on_ready = nil
@on_finish = nil
@catalog = nil
end

def catalog=(catalog)
@catalog = @cluster.catalog = catalog
end

def start
logger.trace("start start")
@forwarder.start
Expand Down Expand Up @@ -118,13 +108,6 @@ def on_ready
@on_ready.call if @on_ready
end

def select_responsive_routes(routes)
selected_nodes = @cluster.forwardable_nodes
routes.select do |route|
selected_nodes.include?(farm_path(route))
end
end

private
def log_tag
"engine_state"
Expand Down
3 changes: 2 additions & 1 deletion lib/droonga/handler_messenger.rb
Expand Up @@ -18,7 +18,7 @@

module Droonga
class HandlerMessenger
attr_reader :database_name, :dispatcher, :engine_state
attr_reader :database_name, :dispatcher, :engine_state, :cluster

def initialize(forwarder, message, options={})
@forwarder = forwarder
Expand All @@ -27,6 +27,7 @@ def initialize(forwarder, message, options={})
@replier = Replier.new(@forwarder)
@dispatcher = options[:dispatcher]
@engine_state = options[:engine_state]
@cluster = options[:cluster]
@database_name = options[:database]
end

Expand Down

0 comments on commit 08f0dfd

Please sign in to comment.