Navigation Menu

Skip to content

Commit

Permalink
collector: use new plugin style
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Feb 17, 2014
1 parent e1c979c commit ed93264
Show file tree
Hide file tree
Showing 18 changed files with 412 additions and 660 deletions.
20 changes: 10 additions & 10 deletions lib/droonga/collector.rb
Expand Up @@ -13,24 +13,24 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

require "droonga/legacy_pluggable"
require "droonga/collector_plugin"
require "droonga/pluggable"
require "droonga/plugin/metadata/collector_message"

module Droonga
class Collector
include LegacyPluggable
extend Pluggable

def initialize(plugins)
load_plugins(plugins)
class << self
def message
Plugin::Metadata::CollectorMessage.new(self)
end
end

private
def instantiate_plugin(name)
CollectorPlugin.repository.instantiate(name)
def initialize
end

def log_tag
"collector"
def collect(message)
raise NotImplemented, "#{self.class.name}\##{__method__} must implement."
end
end
end
@@ -1,4 +1,4 @@
# Copyright (C) 2013-2014 Droonga Project
# Copyright (C) 2014 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand All @@ -14,41 +14,58 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

module Droonga
class LegacyPluginRepository
include Enumerable
class CollectorMessage
attr_reader :raw
def initialize(raw)
@raw = raw
end

def initialize
@plugins = {}
def valid?
task and step and values
end

def each(&block)
@plugins.each(&block)
def [](key)
@raw[key]
end

def register(name, klass)
@plugins[name] = klass
def task
@raw["task"]
end

def [](name)
@plugins[name]
def step
task["step"]
end

def clear
@plugins.clear
def type
step["type"]
end

def instantiate(name, *args, &block)
plugin_class = self[name]
if plugin_class.nil?
# TODO: use the original error
raise ArgumentError, "unknown plugin: <#{name}>"
end
begin
plugin_class.new(*args, &block)
rescue
p [plugin_class, plugin_class.method(:new), plugin_class.method(:new).arity, args.size]
raise
def values
task["values"]
end

def body
step["body"]
end

def input
if body
body[name]
else
nil
end
end

def outputs
step["outputs"]
end

def name
@raw["name"]
end

def value
@raw["value"]
end
end
end
50 changes: 0 additions & 50 deletions lib/droonga/collector_plugin.rb

This file was deleted.

61 changes: 61 additions & 0 deletions lib/droonga/collector_runner.rb
@@ -0,0 +1,61 @@
# Copyright (C) 2014 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

require "droonga/message_matcher"
require "droonga/collector"
require "droonga/collector_message"

module Droonga
class CollectorRunner
def initialize(plugins)
default_plugins = ["basic"]
plugins += (default_plugins - plugins)
@collector_classes = Collector.find_sub_classes(plugins)
end

def shutdown
end

def collect(message)
collector_message = CollectorMessage.new(message)
$log.trace("#{log_tag}: collect: start",
:type => collector_message.type)
collector_class = find_collector_class(message)
if collector_class.nil?
raise UnsupportedMessageError.new(:collector, message)
end
collector = collector_class.new
collector.collect(collector_message)
$log.trace("#{log_tag}: collector: done")
end

private
def find_collector_class(message)
@collector_classes.find do |collector_class|
pattern = collector_class.message.pattern
if pattern
matcher = MessageMatcher.new(pattern)
matcher.match?(message)
else
false
end
end
end

def log_tag
"collector-runner"
end
end
end
45 changes: 32 additions & 13 deletions lib/droonga/dispatcher.rb
Expand Up @@ -18,7 +18,7 @@

require "droonga/adapter_runner"
require "droonga/planner_runner"
require "droonga/collector"
require "droonga/collector_runner"
require "droonga/farm"
require "droonga/session"
require "droonga/replier"
Expand Down Expand Up @@ -56,13 +56,12 @@ def initialize(catalog, options)
@sessions = {}
@current_id = 0
@local = Regexp.new("^#{@name}")
@adapter_runners = create_runners(AdapterRunner)
@adapter_runners = create_adapter_runners
@farm = Farm.new(name, @catalog, @loop, :dispatcher => self)
@forwarder = Forwarder.new(@loop)
@replier = Replier.new(@forwarder)
@planner_runners = create_runners(PlannerRunner)
# TODO: make customizable
@collector = Collector.new(["basic", "search"])
@planner_runners = create_planner_runners
@collector_runners = create_collector_runners
end

def start
Expand All @@ -78,7 +77,9 @@ def shutdown
@planner_runners.each_value do |planner_runner|
planner_runner.shutdown
end
@collector.shutdown
@collector_runners.each_value do |collector_runner|
collector_runner.shutdown
end
@adapter_runners.each_value do |adapter_runner|
adapter_runner.shutdown
end
Expand Down Expand Up @@ -145,7 +146,9 @@ def process_internal_message(message)
steps = message["steps"]
if steps
session_planner = SessionPlanner.new(self, steps)
session = session_planner.create_session(id, @collector)
dataset = message["dataset"] || @message["dataset"]
collector_runner = @collector_runners[dataset]
session = session_planner.create_session(id, collector_runner)
@sessions[id] = session
else
#todo: take cases receiving result before its query into account
Expand Down Expand Up @@ -235,8 +238,6 @@ def process_input_message(message)
target_message = error.message
raise UnknownCommand.new(target_message["type"],
target_message["dataset"])
rescue Droonga::LegacyPluggable::UnknownPlugin => error
raise UnknownCommand.new(error.command, message["dataset"])
end

def assert_valid_message(message)
Expand All @@ -249,14 +250,32 @@ def assert_valid_message(message)
end
end

def create_runners(runner_class)
def create_runners
runners = {}
@catalog.datasets.each do |name, configuration|
runners[name] = runner_class.new(self, configuration["plugins"] || [])
runners[name] = yield(configuration)
end
runners
end

def create_adapter_runners
create_runners do |configuration|
AdapterRunner.new(self, configuration["plugins"] || [])
end
end

def create_planner_runners
create_runners do |configuration|
PlannerRunner.new(self, configuration["plugins"] || [])
end
end

def create_collector_runners
create_runners do |configuration|
CollectorRunner.new(configuration["plugins"] || [])
end
end

def log_tag
"[#{Process.ppid}][#{Process.pid}] dispatcher"
end
Expand All @@ -269,7 +288,7 @@ def initialize(dispatcher, steps)
@steps = steps
end

def create_session(id, collector)
def create_session(id, collector_runner)
resolve_descendants
tasks = []
inputs = {}
Expand All @@ -289,7 +308,7 @@ def create_session(id, collector)
end
end
end
Session.new(id, @dispatcher, collector, tasks, inputs)
Session.new(id, @dispatcher, collector_runner, tasks, inputs)
end

def resolve_descendants
Expand Down

0 comments on commit ed93264

Please sign in to comment.