Navigation Menu

Skip to content

Commit

Permalink
Pass catalog to engine explicitly
Browse files Browse the repository at this point in the history
  • Loading branch information
darashi committed Feb 13, 2014
1 parent e83c11e commit c7a71d3
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
14 changes: 6 additions & 8 deletions lib/droonga/dispatcher.rb
Expand Up @@ -18,13 +18,11 @@

require "droonga/adapter_runner"
require "droonga/planner"
require "droonga/catalog"
require "droonga/collector"
require "droonga/farm"
require "droonga/session"
require "droonga/replier"
require "droonga/message_processing_error"
require "droonga/catalog_observer"
require "droonga/distributor"

module Droonga
Expand All @@ -50,16 +48,16 @@ def initialize(command, dataset)
end
end

def initialize(options)
def initialize(catalog, options)
@catalog = catalog
@options = options
@name = @options[:name]
@loop = EventLoop.new
@catalog_observer = CatalogObserver.new(@loop)
@sessions = {}
@current_id = 0
@local = Regexp.new("^#{@name}")
@adapter_runners = create_adapter_runners
@farm = Farm.new(name, @loop, :dispatcher => self)
@farm = Farm.new(name, @catalog, @loop, :dispatcher => self)
@forwarder = Forwarder.new(@loop)
@replier = Replier.new(@forwarder)
# TODO: make customizable
Expand Down Expand Up @@ -172,7 +170,7 @@ def dispatch_steps(steps)
steps.each do |step|
dataset = step["dataset"]
if dataset
routes = Droonga.catalog.get_routes(dataset, step)
routes = @catalog.get_routes(dataset, step)
step["routes"] = routes
else
step["routes"] ||= [id]
Expand Down Expand Up @@ -240,14 +238,14 @@ def assert_valid_message(message)
raise MissingDatasetParameter.new
end
dataset = message["dataset"]
unless Droonga.catalog.have_dataset?(dataset)
unless @catalog.have_dataset?(dataset)
raise UnknownDataset.new(dataset)
end
end

def create_adapter_runners
runners = {}
Droonga.catalog.datasets.each do |name, configuration|
@catalog.datasets.each do |name, configuration|
runners[name] = AdapterRunner.new(self, configuration["plugins"] || [])
end
runners
Expand Down
5 changes: 3 additions & 2 deletions lib/droonga/engine.rb
Expand Up @@ -20,9 +20,10 @@

module Droonga
class Engine
def initialize(options={})
def initialize(catalog, options={})
@catalog = catalog
@options = options
@dispatcher = Dispatcher.new(@options)
@dispatcher = Dispatcher.new(@catalog, @options)
end

def start
Expand Down
5 changes: 3 additions & 2 deletions lib/droonga/farm.rb
Expand Up @@ -19,12 +19,13 @@

module Droonga
class Farm
def initialize(name, loop, options={})
def initialize(name, catalog, loop, options={})
@name = name
@catalog = catalog
@loop = loop
@options = options
@partitions = {}
partitions = Droonga.catalog.get_partitions(name)
partitions = @catalog.get_partitions(name)
partitions.each do |partition_name, partition_options|
partition = Droonga::Partition.new(@loop,
@options.merge(partition_options))
Expand Down
8 changes: 7 additions & 1 deletion lib/fluent/plugin/out_droonga.rb
Expand Up @@ -17,6 +17,7 @@

require "droonga/engine"
require "droonga/plugin_loader"
require "droonga/catalog_loader"

module Fluent
class DroongaOutput < Output
Expand All @@ -27,7 +28,7 @@ class DroongaOutput < Output
def start
super
Droonga::PluginLoader.load_all
@engine = Droonga::Engine.new(:name => @name)
@engine = Droonga::Engine.new(catalog, :name => @name)
@engine.start
end

Expand All @@ -44,6 +45,11 @@ def emit(tag, es, chain)
end

private
def catalog
catalog_loader = Droonga::CatalogLoader.new("catalog.json")
catalog_loader.load
end

def process_event(tag, record)
$log.trace("out_droonga: tag: <#{tag}>")
@engine.process(parse_record(tag, record))
Expand Down

0 comments on commit c7a71d3

Please sign in to comment.