Skip to content

Commit

Permalink
Build distributed command messages by the DistributedCommandPlanner
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jan 29, 2014
1 parent 7f3d12b commit 2b8b956
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 100 deletions.
100 changes: 100 additions & 0 deletions lib/droonga/distributed_command_planner.rb
@@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2014 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

module Droonga
class DistributedCommandPlanner
attr_accessor :key
attr_reader :outputs

def initialize(source_message)
@source_message = source_message

@key = nil
@outputs = []

@reducers = []
@gatherers = []
@processors = []

plan_errors_handling
end

def messages
@reducers + @gatherers + @processors
end

def reduce(name, reducer)
@reducers << {
"type" => "reduce",
"body" => {
name => {
"#{name}_reduced" => reducer,
},
},
"inputs" => [name],
"outputs" => ["#{name}_reduced"],
}

@gatherers << {
"type" => "gather",
"body" => {
"#{name}_reduced" => {
"output" => name,
},
},
"inputs" => ["#{name}_reduced"],
"post" => true,
}
end

def scatter_all
raise MessageProcessingError.new("missing key") unless @key
@processors << {
"command" => @source_message["type"],
"dataset" => @source_message["dataset"],
"body" => @source_message["body"],
"key" => @key,
"type" => "scatter",
"outputs" => @outputs + ["errors"],
"replica" => "all",
"post" => true
}
end

def broadcast_all
@processors << {
"command" => @source_message["type"],
"dataset" => @source_message["dataset"],
"body" => @source_message["body"],
"type" => "broadcast",
"outputs" => @outputs + ["errors"],
"replica" => "all",
"post" => true
}
end

private
#XXX Now, we include definitions to merge errors in the body.
# However, this makes the term "errors" reserved, so plugins
# cannot use their custom "errors" in the body.
# This must be rewritten.
def plan_errors_handling
@outputs << "errors"
reduce("errors", "type" => "sum", "limit" => -1)
end
end
end
70 changes: 1 addition & 69 deletions lib/droonga/distributor_plugin.rb
Expand Up @@ -16,6 +16,7 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

require "droonga/plugin"
require "droonga/distributed_command_planner"

module Droonga
class DistributorPlugin < Plugin
Expand All @@ -30,16 +31,6 @@ def distribute(messages)
@distributor.distribute(messages)
end

def scatter_all(message, key)
messages = [reducer(message), gatherer(message), scatterer(message, key)]
distribute(messages)
end

def broadcast_all(message)
messages = [reducer(message), gatherer(message), broadcaster(message)]
distribute(messages)
end

private
def process_error(command, error, arguments)
if error.is_a?(MessageProcessingError)
Expand All @@ -48,64 +39,5 @@ def process_error(command, error, arguments)
super
end
end

#XXX Now, default scatterer/broadcaster/reducer/gatherer includes
# definitions to merge errors in the body. However, this makes
# the term "errors" reserved, so plugins cannot use their custom
# "errors" in the body. This must be rewritten.

def scatterer(message, key)
{
"command" => message["type"],
"dataset" => message["dataset"],
"body" => message["body"],
"key" => key,
"type" => "scatter",
"outputs" => ["errors"],
"replica" => "all",
"post" => true
}
end

def broadcaster(message)
{
"command" => message["type"],
"dataset" => message["dataset"],
"body" => message["body"],
"type" => "broadcast",
"outputs" => ["errors"],
"replica" => "all",
"post" => true
}
end

def reducer(message)
{
"type" => "reduce",
"body" => {
"errors" => {
"errors_reduced" => {
"type" => "sum",
"limit" => -1,
},
},
},
"inputs" => ["errors"],
"outputs" => ["errors_reduced"],
}
end

def gatherer(message)
{
"type" => "gather",
"body" => {
"errors_reduced" => {
"output" => "errors",
},
},
"inputs" => ["errors_reduced"],
"post" => true,
}
end
end
end
41 changes: 10 additions & 31 deletions lib/droonga/plugin/distributor/crud.rb
Expand Up @@ -23,49 +23,28 @@ class CRUDDistributor < Droonga::DistributorPlugin

command :add
def add(message)
key = message["body"]["key"] || rand.to_s
scatter_all(message, key)
scatter_all(message)
end

command :update
def update(message)
key = message["body"]["key"] || rand.to_s
scatter_all(message, key)
scatter_all(message)
end

# TODO: What is this?
command :reset
def reset(message)
key = message["body"]["key"] || rand.to_s
scatter_all(message, key)
scatter_all(message)
end

private
def scatterer(message, key)
scatterer = super
scatterer["outputs"] << "success"
scatterer
end

def reducer(message)
reducer = super
reducer["body"]["success"] = {
"success_reduced" => {
"type" => "and",
},
}
reducer["inputs"] << "success"
reducer["outputs"] << "success_reduced"
reducer
end

def gatherer(message)
gatherer = super
gatherer["body"]["success_reduced"] = {
"output" => "success",
}
gatherer["inputs"] << "success_reduced"
gatherer
def scatter_all(message)
planner = DistributedCommandPlanner.new(message)
planner.key = message["body"]["key"] || rand.to_s
planner.outputs << "success"
planner.reduce("success", "type" => "and")
planner.scatter_all
distribute(planner.messages)
end
end
end

0 comments on commit 2b8b956

Please sign in to comment.