Navigation Menu

Skip to content

Commit

Permalink
planner: migrate to new plugin style
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Feb 17, 2014
1 parent 839d1dc commit cb63b0f
Show file tree
Hide file tree
Showing 23 changed files with 620 additions and 583 deletions.
22 changes: 14 additions & 8 deletions lib/droonga/dispatcher.rb
Expand Up @@ -17,7 +17,7 @@
require "tsort"

require "droonga/adapter_runner"
require "droonga/planner"
require "droonga/planner_runner"
require "droonga/collector"
require "droonga/farm"
require "droonga/session"
Expand Down Expand Up @@ -56,12 +56,11 @@ def initialize(catalog, options)
@sessions = {}
@current_id = 0
@local = Regexp.new("^#{@name}")
@adapter_runners = create_adapter_runners
@adapter_runners = create_runners(AdapterRunner)
@farm = Farm.new(name, @catalog, @loop, :dispatcher => self)
@forwarder = Forwarder.new(@loop)
@replier = Replier.new(@forwarder)
# TODO: make customizable
@planner = Planner.new(self, ["search", "crud", "groonga", "watch"])
@planner_runners = create_runners(PlannerRunner)
# TODO: make customizable
@collector = Collector.new(["basic", "search"])
end
Expand All @@ -76,7 +75,9 @@ def start

def shutdown
@forwarder.shutdown
@planner.shutdown
@planner_runners.each_value do |planner_runner|
planner_runner.shutdown
end
@collector.shutdown
@adapter_runners.each_value do |adapter_runner|
adapter_runner.shutdown
Expand Down Expand Up @@ -226,9 +227,14 @@ def process_input_message(message)
dataset = message["dataset"]
adapter_runner = @adapter_runners[dataset]
adapted_message = adapter_runner.adapt_input(message)
plan = @planner.process(adapted_message["type"], adapted_message)
planner_runner = @planner_runners[dataset]
plan = planner_runner.plan(adapted_message)
distributor = Distributor.new(self)
distributor.distribute(plan)
rescue Droonga::UnsupportedMessageError => error
target_message = error.message
raise UnknownCommand.new(target_message["type"],
target_message["dataset"])
rescue Droonga::LegacyPluggable::UnknownPlugin => error
raise UnknownCommand.new(error.command, message["dataset"])
end
Expand All @@ -243,10 +249,10 @@ def assert_valid_message(message)
end
end

def create_adapter_runners
def create_runners(runner_class)
runners = {}
@catalog.datasets.each do |name, configuration|
runners[name] = AdapterRunner.new(self, configuration["plugins"] || [])
runners[name] = runner_class.new(self, configuration["plugins"] || [])
end
runners
end
Expand Down
10 changes: 10 additions & 0 deletions lib/droonga/error.rb
Expand Up @@ -31,4 +31,14 @@ def initialize(errors=[])
super(message)
end
end

# TODO: Move to common file for runners
class UnsupportedMessageError < Error
attr_reader :phase, :message
def initialize(phase, message)
@phase = phase
@message = message
super("[#{@phase}] Unsupported message: #{@message.inspect}")
end
end
end
35 changes: 26 additions & 9 deletions lib/droonga/planner.rb
Expand Up @@ -13,25 +13,42 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

require "droonga/legacy_pluggable"
require "droonga/planner_plugin"
require "droonga/pluggable"
require "droonga/plugin/metadata/planner_message"
require "droonga/distributed_command_planner"

module Droonga
class Planner
include LegacyPluggable
extend Pluggable

def initialize(dispatcher, plugins)
class << self
def message
Plugin::Metadata::PlannerMessage.new(self)
end
end

def initialize(dispatcher)
@dispatcher = dispatcher
load_plugins(plugins)
end

def plan(message)
raise NotImplemented, "#{self.class.name}\##{__method__} must implement."
end

private
def instantiate_plugin(name)
PlannerPlugin.repository.instantiate(name, self)
def scatter(message, options={})
planner = DistributedCommandPlanner.new(message)
planner.scatter
planner.key = options[:key]
planner.reduce(options[:reduce])
planner.plan
end

def log_tag
"[#{Process.pid}] planner"
def broadcast(message, options={})
planner = DistributedCommandPlanner.new(message)
planner.broadcast(:write => options[:write])
planner.reduce(options[:reduce])
planner.plan
end
end
end
61 changes: 61 additions & 0 deletions lib/droonga/planner_runner.rb
@@ -0,0 +1,61 @@
# Copyright (C) 2014 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

require "droonga/message_matcher"
require "droonga/planner"

module Droonga
class PlannerRunner
def initialize(dispatcher, plugins)
@dispatcher = dispatcher
@planner_classes = Planner.find_sub_classes(plugins)
end

def shutdown
end

def plan(message)
$log.trace("#{log_tag}: plan: start",
:dataset => message["dataset"],
:type => message["type"])
planner_class = find_planner_class(message)
if planner_class.nil?
raise UnsupportedMessageError.new(:planner, message)
end
planner = planner_class.new(@dispatcher)
plan = planner.plan(message)
$log.trace("#{log_tag}: plan: done",
:steps => plan.collect {|step| step["type"]})
plan
end

private
def find_planner_class(message)
@planner_classes.find do |planner_class|
pattern = planner_class.message.pattern
if pattern
matcher = MessageMatcher.new(pattern)
matcher.match?(message)
else
false
end
end
end

def log_tag
"adapter-runner"
end
end
end
1 change: 1 addition & 0 deletions lib/droonga/plugin.rb
Expand Up @@ -15,6 +15,7 @@

require "droonga/plugin_registry"
require "droonga/adapter"
require "droonga/planner"
require "droonga/handler"

module Droonga
Expand Down
52 changes: 52 additions & 0 deletions lib/droonga/plugin/metadata/planner_message.rb
@@ -0,0 +1,52 @@
# Copyright (C) 2014 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

module Droonga
module Plugin
module Metadata
class PlannerMessage
def initialize(plugin_class)
@plugin_class = plugin_class
end

def pattern
configuration[:pattern] || fallback_pattern
end

def pattern=(pattern)
configuration[:pattern] = pattern
end

def type
configuration[:type]
end

def type=(type)
configuration[:type] = type
end

private
def configuration
@plugin_class.options[:message] ||= {}
end

def fallback_pattern
return nil if type.nil?
["type", :equal, type]
end
end
end
end
end
49 changes: 0 additions & 49 deletions lib/droonga/plugin/planner/crud.rb

This file was deleted.

0 comments on commit cb63b0f

Please sign in to comment.