Navigation Menu

Skip to content

Commit

Permalink
Count total number of actually absorbed objects correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 21, 2015
1 parent aa7d648 commit 91c675b
Showing 1 changed file with 78 additions and 11 deletions.
89 changes: 78 additions & 11 deletions lib/droonga/plugins/system/absorb_data.rb
Expand Up @@ -13,12 +13,13 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "fiber"

require "droonga/plugin"
require "droonga/plugin/async_command"
require "droonga/catalog/dataset"
require "droonga/serf"
require "droonga/node_name"
require "droonga/database_scanner"

require "drndump/dump_client"

Expand All @@ -37,23 +38,26 @@ def initialize
end

class DataAbsorber < AsyncCommand::AsyncHandler
include DatabaseScanner

class EmptyResponse < StandardError
end

class EmptyBody < StandardError
end

def initialize(context, loop, messenger, request)
@context = context
super(loop, messenger, request)
end

def start
logger.trace("start: start")
on_start

count_total_n_objects do |n_objects|
@initial_n_objects = n_objects
do_absorb
end

logger.trace("start: done")
end

def do_absorb
logger.trace("do_absorb: start")
@dumper_error_message = nil

@dumper = Drndump::DumpClient.new(dumper_params)
Expand Down Expand Up @@ -108,7 +112,7 @@ def start
end

on_finish if @dumper_error_message
logger.trace("start: done")
logger.trace("do_absorb: done")
end

private
Expand All @@ -125,7 +129,70 @@ def error_message
end

def ensure_completely_restored(&block)
yield
runner = Fiber.new do
completely_restored = false
n_expected_objects = @dumper.n_forecasted_messages
while not completely_restored
count_total_n_objects do |count|
n_restored_objects = count - @initial_n_objects
logger.trace("ensure_completely_restored: check",
:current => n_restored_objects,
:forecasted => n_expected_objects)
completely_restored ||= n_restored_objects == n_expected_objects
end
Fiber.yield
end
count_client.close
yield
end

timer = Coolio::TimerWatcher.new(3, true)
timer.on_timer do
if runner.alive?
begin
runner.resume
rescue
timer.detach
# logger.trace("start: watcher detached on unexpected exception",
# :watcher => timer)
logger.exception(error_message, $!)
error(error_name, error_message)
end
else
timer.detach
# logger.trace("start: watcher detached on unexpected exception",
# :watcher => timer)
end
end
@loop.attach(timer)
end

def count_total_n_objects(&block)
count_message = {
"type" => "system.object-count",
"dataset" => current_dataset,
"body" => {
"output" => ["total"],
},
}
count_client.request(count_message) do |response|
yield(response["body"]["total"])
end
end

def count_client
@count_client ||= Droonga::Client.new(count_client_options)
end

def count_client_options
{
:host => myself.host,
:port => myself.port,
:tag => myself.tag,
:protocol => :droonga,
:backend => :coolio,
:loop => @loop,
}
end

def on_finish
Expand Down Expand Up @@ -234,7 +301,7 @@ def handle(message)

private
def start(request)
absorber = DataAbsorber.new(@context, loop, messenger, request)
absorber = DataAbsorber.new(loop, messenger, request)
absorber.start
end
end
Expand Down

0 comments on commit 91c675b

Please sign in to comment.