Navigation Menu

Skip to content

Commit

Permalink
Implement system.absorb-data without Fiber and timer.
Browse files Browse the repository at this point in the history
They are not required because Drndump::Dumper is already asynchronous.
  • Loading branch information
piroor committed Apr 14, 2015
1 parent 37be365 commit 12a6a15
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 59 deletions.
48 changes: 10 additions & 38 deletions lib/droonga/plugin/async_command.rb
Expand Up @@ -93,46 +93,24 @@ def initialize(loop, messenger, request)
end

def start
setup_forward_data

forward("#{prefix}.start")

runner = Fiber.new do
handle
end

timer = Coolio::TimerWatcher.new(0.1, true)
timer.on_timer do
if runner.alive?
begin
runner.resume
rescue
timer.detach
logger.trace("start: watcher detached on unexpected exception",
:watcher => timer)
logger.exception(error_message, $!)
error(error_name, error_message)
end
else
timer.detach
logger.trace("start: watcher detached on unexpected exception",
:watcher => timer)
end
end

@loop.attach(timer)
logger.trace("start: new watcher attached",
:watcher => timer)
#XXX override me!!
on_start
on_finish
end

private
def prefix
"" #XXX override me!!
end

def handle
def on_start
setup_forward_data
forward("#{prefix}.start")
end

def on_finish
#XXX override me!!
forward("#{prefix}.end") #XXX you must forward "end" message by self!
forward("#{prefix}.end")
end

def setup_forward_data
Expand All @@ -141,8 +119,6 @@ def setup_forward_data
"dataset" => @request.dataset,
}
@forward_to = @request.reply_to
@n_forwarded_messages = 0
@messages_per_100msec = @request.messages_per_seconds / 10
end

def error_name
Expand Down Expand Up @@ -175,10 +151,6 @@ def forward(type, body=nil)
@messenger.forward(forward_message,
"to" => @forward_to,
"type" => type)

@n_forwarded_messages += 1
@n_forwarded_messages %= @messages_per_100msec
Fiber.yield if @n_forwarded_messages.zero?
end

def log_tag
Expand Down
54 changes: 47 additions & 7 deletions lib/droonga/plugins/dump.rb
Expand Up @@ -50,6 +50,40 @@ def initialize(context, loop, messenger, request)
super(loop, messenger, request)
end

def start
on_start

runner = Fiber.new do
dump_schema
dump_records
dump_indexes
on_finish
end

timer = Coolio::TimerWatcher.new(0.1, true)
timer.on_timer do
if runner.alive?
begin
runner.resume
rescue
timer.detach
logger.trace("start: watcher detached on unexpected exception",
:watcher => timer)
logger.exception(error_message, $!)
error(error_name, error_message)
end
else
timer.detach
logger.trace("start: watcher detached on unexpected exception",
:watcher => timer)
end
end

@loop.attach(timer)
logger.trace("start: new watcher attached",
:watcher => timer)
end

private
def prefix
"dump"
Expand All @@ -63,13 +97,6 @@ def error_message
"failed to dump"
end

def handle
dump_schema
dump_records
dump_indexes
forward("#{prefix}.end")
end

def dump_schema
reference_tables = []
each_table do |table|
Expand Down Expand Up @@ -246,6 +273,19 @@ def each_index_columns
end
end

def setup_forward_data
super
@n_forwarded_messages = 0
@messages_per_100msec = @request.messages_per_seconds / 10
end

def forward(type, body=nil)
super
@n_forwarded_messages += 1
@n_forwarded_messages %= @messages_per_100msec
Fiber.yield if @n_forwarded_messages.zero?
end

def log_tag
"[#{Process.ppid}] dumper"
end
Expand Down
30 changes: 16 additions & 14 deletions lib/droonga/plugins/system/absorb_data.rb
Expand Up @@ -42,20 +42,9 @@ class EmptyResponse < StandardError
class EmptyBody < StandardError
end

private
def prefix
"system.absorb-data"
end
def start
on_start

def error_name
"AbsorbFailure"
end

def error_message
"failed to absorb data"
end

def handle
@dumper_error_message = nil

dumper = Drndump::Dumper.new(dumper_params)
Expand Down Expand Up @@ -96,6 +85,19 @@ def handle
on_finish if @dumper_error_message
end

private
def prefix
"system.absorb-data"
end

def error_name
"AbsorbFailure"
end

def error_message
"failed to absorb data"
end

def on_finish
begin
if @dumper_error_message
Expand All @@ -108,7 +110,7 @@ def on_finish
@dumper_error_message = exception.to_s
error(error_name, @dumper_error_message)
end
forward("#{prefix}.end")
super
end

def dumper_params
Expand Down

0 comments on commit 12a6a15

Please sign in to comment.