Navigation Menu

Skip to content

Commit

Permalink
Extract common codes to implement asynchronous handler from the dump …
Browse files Browse the repository at this point in the history
…command
  • Loading branch information
piroor committed Apr 10, 2015
1 parent ac2aa42 commit e7fad61
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 118 deletions.
173 changes: 173 additions & 0 deletions lib/droonga/plugin/async_command.rb
@@ -0,0 +1,173 @@
# Copyright (C) 2014-2015 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "coolio"

require "droonga/handler"
require "droonga/error_messages"

module Droonga
module Plugins
module AsyncCommand
class Request
def initialize(message)
@message = message
end

def need_start?
reply_to
end

def id
@message["id"]
end

def dataset
@message.raw["dataset"]
end

def reply_to
(@message.raw["replyTo"] || {})["to"]
end

def messages_per_seconds
request = (@message.request || {})
minimum_messages_per_seconds = 10
[
minimum_messages_per_seconds,
(request["messagesPerSecond"] || 10000).to_i,
].max
end
end

class Handler < Droonga::Handler
def handle(message)
request = Request.new(message)
if request.need_start?
start(request)
true
else
false
end
end

def start(request)
#XXX override me!
# handler = MyAsyncHandler.new(loop, messenger, request)
# handler.start
end
end

class AsyncHandler
def initialize(loop, messenger, request)
@loop = loop
@messenger = messenger
@request = request
end

def start
setup_forward_data

forward("#{prefix}.start")

runner = Fiber.new do
handle
forward("#{prefix}.end")
end

timer = Coolio::TimerWatcher.new(0.1, true)
timer.on_timer do
begin
runner.resume
rescue FiberError
timer.detach
logger.trace("start: watcher detached on FiberError",
:watcher => timer)
rescue
timer.detach
logger.trace("start: watcher detached on unexpected exception",
:watcher => timer)
logger.exception(error_message, $!)
error(error_name, error_message)
end
end

@loop.attach(timer)
logger.trace("start: new watcher attached",
:watcher => timer)
end

private
def prefix
"" #XXX override me!!
end

def handle
#XXX override me!!
end

def setup_forward_data
@base_forward_message = {
"inReplyTo" => @request.id,
"dataset" => @request.dataset,
}
@forward_to = @request.reply_to
@n_forwarded_messages = 0
@messages_per_100msec = @request.messages_per_seconds / 10
end

def error_name
"Failure" #XXX override me!!
end

def error_message
"failed to do" #XXX override me!!
end

def error(name, message)
message = {
"statusCode" => ErrorMessages::InternalServerError::STATUS_CODE,
"body" => {
"name" => name,
"message" => message,
},
}
error_message = @base_forward_message.merge(message)
@messenger.forward(error_message,
"to" => @forward_to,
"type" => "#{prefix}.error")
end

def forward(type, body=nil)
forward_message = @base_forward_message
if body
forward_message = forward_message.merge("body" => body)
end
@messenger.forward(forward_message,
"to" => @forward_to,
"type" => type)

@n_forwarded_messages += 1
@n_forwarded_messages %= @messages_per_100msec
Fiber.yield if @n_forwarded_messages.zero?
end

def log_tag
"[#{Process.ppid}] async-handler"
end
end
end
end
end
139 changes: 21 additions & 118 deletions lib/droonga/plugins/dump.rb
Expand Up @@ -16,6 +16,7 @@
require "groonga"

require "droonga/plugin"
require "droonga/plugin/async_command"
require "droonga/error_messages"

module Droonga
Expand All @@ -35,136 +36,38 @@ module Dump
extend Plugin
register("dump")

class Handler < Droonga::Handler
def handle(message)
request = Request.new(message)
if request.need_dump?
dumper = Dumper.new(@context, loop, messenger, request)
dumper.start_dump
true
else
false
end
end
end

class Request
def initialize(message)
@message = message
end

def need_dump?
reply_to
end

def id
@message["id"]
end

def dataset
@message.raw["dataset"]
end

def reply_to
(@message.raw["replyTo"] || {})["to"]
end

def messages_per_seconds
request = (@message.request || {})
minimum_messages_per_seconds = 10
[
minimum_messages_per_seconds,
(request["messagesPerSecond"] || 10000).to_i,
].max
class Handler < AsyncCommand::Handler
def start(request)
dumper = Dumper.new(@context, loop, messenger, request)
dumper.start_dump
end
end

class Dumper
class Dumper < AsyncCommand::AsyncHandler
include Loggable

def initialize(context, loop, messenger, request)
@context = context
@loop = loop
@messenger = messenger
@request = request
end

def start_dump
setup_forward_data

forward("dump.start")

dumper = Fiber.new do
dump_schema
dump_records
dump_indexes
forward("dump.end")
end

on_error = lambda do |exception|
message = "failed to dump"
logger.exception(message, $!)
error("DumpFailure", message)
end

timer = Coolio::TimerWatcher.new(0.1, true)
timer.on_timer do
begin
dumper.resume
rescue FiberError
timer.detach
logger.trace("start_dump: watcher detached on FiberError",
:watcher => timer)
rescue
timer.detach
logger.trace("start_dump: watcher detached on unexpected exception",
:watcher => timer)
on_error.call($!)
end
end

@loop.attach(timer)
logger.trace("start_dump: new watcher attached",
:watcher => timer)
super(loop, messenger, request)
end

private
def setup_forward_data
@base_forward_message = {
"inReplyTo" => @request.id,
"dataset" => @request.dataset,
}
@forward_to = @request.reply_to
@n_forwarded_messages = 0
@messages_per_100msec = @request.messages_per_seconds / 10
def prefix
"dump"
end

def error(name, message)
message = {
"statusCode" => ErrorMessages::InternalServerError::STATUS_CODE,
"body" => {
"name" => name,
"message" => message,
},
}
error_message = @base_forward_message.merge(message)
@messenger.forward(error_message,
"to" => @forward_to,
"type" => "dump.error")
def error_name
"DumpFailure"
end

def forward(type, body=nil)
forward_message = @base_forward_message
if body
forward_message = forward_message.merge("body" => body)
end
@messenger.forward(forward_message,
"to" => @forward_to,
"type" => type)
def error_message
"failed to dump"
end

@n_forwarded_messages += 1
@n_forwarded_messages %= @messages_per_100msec
Fiber.yield if @n_forwarded_messages.zero?
def handle
dump_schema
dump_records
dump_indexes
end

def dump_schema
Expand All @@ -183,7 +86,7 @@ def dump_schema
end

def dump_table(table)
forward("dump.table", table_body(table))
forward("#{prefix}.table", table_body(table))

columns = table.columns.sort_by(&:name)
columns.each do |column|
Expand Down Expand Up @@ -216,7 +119,7 @@ def table_type(table)
end

def dump_column(column)
forward("dump.column", column_body(column))
forward("#{prefix}.column", column_body(column))
end

def column_body(column)
Expand Down Expand Up @@ -276,7 +179,7 @@ def dump_records
"key" => record.key,
"values" => values,
}
forward("dump.record", body)
forward("#{prefix}.record", body)
end
end
end
Expand Down

0 comments on commit e7fad61

Please sign in to comment.