Navigation Menu

Skip to content

Commit

Permalink
Apply schema in catalog in slice instead of dispatcher
Browse files Browse the repository at this point in the history
Because schema isn't needed to spread as message. It should be applied
on each node.
  • Loading branch information
kou committed Apr 11, 2014
1 parent e2256dc commit d3e30f5
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 319 deletions.
138 changes: 1 addition & 137 deletions lib/droonga/catalog/schema.rb
Expand Up @@ -26,12 +26,6 @@ def initialize(data)
def weight
@data["weight"]
end

def flags
flags = []
flags << "WITH_WEIGHT" if weight
flags
end
end

class ColumnIndexOptions
Expand All @@ -54,14 +48,6 @@ def position
def sources
@data["sources"]
end

def flags
flags = []
flags << "WITH_SECTION" if section
flags << "WITH_WEIGHT" if weight
flags << "WITH_POSITION" if position
flags
end
end

class Column
Expand Down Expand Up @@ -95,23 +81,6 @@ def type_symbol
end
end

def type_flag
case type
when "Scalar"
"COLUMN_SCALAR"
when "Vector"
"COLUMN_VECTOR"
when "Index"
"COLUMN_INDEX"
else
# TODO raise appropriate error
end
end

def flags
[type_flag] + vector_options.flags + index_options.flags
end

def value_type
@data["valueType"]
end
Expand All @@ -124,21 +93,6 @@ def value_type_groonga
end
end

def to_column_create_body
body = {
"name" => name,
"table" => table,
"flags" => flags.join("|"),
"type" => value_type_groonga
}
sources = index_options.sources
if sources
body["source"] = sources.join(",")
end

body
end

private
def vector_options_data
@data["vectorOptions"] || {}
Expand Down Expand Up @@ -195,7 +149,7 @@ def key_type_groonga
when "Float", "Time", "ShortText", "TokyoGeoPoint", "WGS84GeoPoint"
key_type
else
# TODO raise appropriate error
key_type
end
end

Expand All @@ -207,76 +161,12 @@ def normalizer
@data["normalizer"]
end

def type_flag
case type
when "Array"
"TABLE_NO_KEY"
when "Hash"
"TABLE_HASH_KEY"
when "PatriciaTrie"
"TABLE_PAT_KEY"
when "DoubleArrayTrie"
"TABLE_DAT_KEY"
else
# TODO raise appropriate error
end
end

def flags
[type_flag]
end

def to_table_create_body
body = {
"name" => name,
"key_type" => key_type_groonga,
"flags" => flags.join("|")
}

if tokenizer
body["default_tokenizer"] = tokenizer
end

if normalizer
body["normalizer"] = normalizer
end

body
end

private
def columns_data
@data["columns"] || []
end
end

class ColumnCreateSorter
include TSort

def initialize(tables)
@tables = tables
end

def all_columns
@tables.values.collect {|table| table.columns.values}.flatten
end

def tsort_each_node(&block)
all_columns.each(&block)
end

def tsort_each_child(column, &block)
dependent_column_names = column.index_options.sources || []
dependent_column_names -= ["_key"] # _key always exists after the table created
reference_table = @tables[column.value_type_groonga]
# TODO when _key specified, check to ensure reference_table is not Array
dependent_columns = dependent_column_names.collect do |column_name|
reference_table.columns[column_name]
end
dependent_columns.each(&block)
end
end

attr_reader :tables
def initialize(dataset_name, data)
@dataset_name = dataset_name
Expand All @@ -287,32 +177,6 @@ def initialize(dataset_name, data)
end
end

def to_messages
messages = []

tables.each do |name, table|
messages << {
"type" => "table_create",
"dataset" => @dataset_name,
"body" => table.to_table_create_body
}
end

sorter = ColumnCreateSorter.new(tables)
columns = sorter.tsort
# TODO handle TSort::Cyclic

columns.each do |column|
messages << {
"type" => "column_create",
"dataset" => @dataset_name,
"body" => column.to_column_create_body
}
end

messages
end

def ==(other)
self.class == other.class and
tables == other.tables
Expand Down
12 changes: 0 additions & 12 deletions lib/droonga/dispatcher.rb
Expand Up @@ -62,8 +62,6 @@ def initialize(engine_state, catalog, options)

def start
@farm.start

ensure_schema
end

def shutdown
Expand Down Expand Up @@ -268,16 +266,6 @@ def create_step_runners
end
end

def ensure_schema
@catalog.datasets.each do |name, dataset|
schema = dataset.schema
messages = schema.to_messages
messages.each do |message|
process_message(message)
end
end
end

def log_tag
"[#{Process.ppid}][#{Process.pid}] dispatcher"
end
Expand Down
4 changes: 3 additions & 1 deletion lib/droonga/farm.rb
Expand Up @@ -27,7 +27,9 @@ def initialize(name, catalog, loop, options={})
@slices = {}
slices = @catalog.slices(name)
slices.each do |slice_name, slice_options|
slice = Droonga::Slice.new(@loop,
dataset = @catalog.datasets[slice_options[:dataset]]
slice = Droonga::Slice.new(dataset,
@loop,
@options.merge(slice_options))
@slices[slice_name] = slice
end
Expand Down
168 changes: 168 additions & 0 deletions lib/droonga/schema_applier.rb
@@ -0,0 +1,168 @@
# Copyright (C) 2014 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

require "droonga/loggable"

module Droonga
class SchemaApplier
include Loggable

def initialize(context, schema)
@context = context
@schema = schema
end

def apply
# TODO: Support migration
Groonga::Schema.define(:context => @context) do |schema|
create_tables(schema)
create_reference_columns(schema)
create_index_columns(schema)
end
end

private
def each_table
reference_tables = []
@schema.tables.each_value do |table|
if reference_table?(table)
reference_tables << table
else
yield(table)
end
end
reference_tables.each do |table|
yield(table)
end
end

def reference_table?(table)
table.type != "Array" and !type_names.include?(table.key_type_groonga)
end

def each_column(table)
table.columns.each_value do |column|
yield(column)
end
end

def normal_column?(column)
column.type != "Index" and type_names.include?(column.value_type_groonga)
end

def reference_column?(column)
column.type != "Index" and !type_names.include?(column.value_type_groonga)
end

def index_column?(column)
column.type == "Index"
end

def types
@types ||= collect_available_types
end

def type_names
@type_names ||= types.collect(&:name)
end

def collect_available_types
each_options = {
:ignore_missing_object => true
}
@context.database.each(each_options).find_all do |object|
object.is_a?(Groonga::Type)
end
end

def create_tables(schema)
each_table do |table|
create_table(schema, table)
end
end

def create_table(schema, table)
options = {
:type => table.type_symbol,
:key_type => table.key_type_groonga,
:default_tokenizer => table.tokenizer,
:normalizer => table.normalizer,
}
p [table.name, options]
schema.create_table(table.name, options) do |table_definition|
each_column(table) do |column|
next unless normal_column?(column)
create_data_column(table_definition, column)
end
end
end

def create_data_column(table_definition, column)
options = {
:type => column.type_symbol,
}
if options[:type] == :vector
options[:with_weight] = true if column.vector_options.weight
end
table_definition.column(column.name, column.value_type_groonga, options)
end

def create_reference_columns(schema)
each_table do |table|
reference_columns = []
each_column(table) do |column|
reference_columns << column if reference_column?(column)
end
next if reference_columns.empty?

schema.change_table(table.name) do |table_definition|
reference_columns.each do |column|
create_data_column(table_definition, column)
end
end
end
end

def create_index_columns(schema)
each_table do |table|
index_columns = []
each_column(table) do |column|
index_columns << column if index_column?(column)
end
next if index_columns.empty?

schema.change_table(table.name) do |table_definition|
index_columns.each do |column|
create_index_column(table_definition, column)
end
end
end
end

def create_index_column(table_definition, column)
sources = column.index_options.sources || []
options = {
:with_section => column.index_options.section,
:with_weight => column.index_options.weight,
:with_position => column.index_options.position,
}
table_definition.index(column.value_type_groonga, *sources, options)
end

def log_tag
"schema_applier"
end
end
end

0 comments on commit d3e30f5

Please sign in to comment.