Navigation Menu

Skip to content

Commit

Permalink
Report actual progress
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 21, 2015
1 parent 91c675b commit 8e0a473
Showing 1 changed file with 109 additions and 36 deletions.
145 changes: 109 additions & 36 deletions lib/droonga/plugins/system/absorb_data.rb
Expand Up @@ -49,7 +49,7 @@ def start
on_start

count_total_n_objects do |n_objects|
@initial_n_objects = n_objects
@n_initial_objects = n_objects
do_absorb
end

Expand All @@ -60,29 +60,13 @@ def do_absorb
logger.trace("do_absorb: start")
@dumper_error_message = nil

@dumper = Drndump::DumpClient.new(dumper_params)
@dumper.on_finish = lambda do
ensure_completely_restored do
on_finish
logger.trace("start: finish")
end
end
@dumper.on_progress = lambda do |message|
logger.trace("dump progress",
:message => message)
end
@dumper.on_error = lambda do |error|
if error.is_a?(Exception)
logger.exception("unexpected exception while dump",
error)
else
logger.error("unexpected error while dump",
:error => error)
end
end

@n_restored_objects = 0
@measure_start_time = Time.now
@previous_measure_time = @measure_start_time
@previous_n_restored_objects = 0.0
@previous_report_time = Time.now

@dumper = create_dumper
begin
logger.info("starting to absorb the source dataset")
@dumper_error_message = @dumper.run(dump_options) do |message|
Expand All @@ -92,12 +76,7 @@ def do_absorb
@messenger.forward(message,
"to" => my_node_name,
"type" => message["type"])
now = Time.now
elapsed_seconds = (now - @previous_report_time).to_i
if elapsed_seconds >= progress_interval_seconds
@previous_report_time = now
report_progress
end
try_report_progress(:count_restored_objects => true)
rescue Exception => exception
@dumper_error_message = exception.to_s
logger.exception("failed to process progress",
Expand Down Expand Up @@ -134,11 +113,9 @@ def ensure_completely_restored(&block)
n_expected_objects = @dumper.n_forecasted_messages
while not completely_restored
count_total_n_objects do |count|
n_restored_objects = count - @initial_n_objects
logger.trace("ensure_completely_restored: check",
:current => n_restored_objects,
:forecasted => n_expected_objects)
completely_restored ||= n_restored_objects == n_expected_objects
@n_restored_objects = count - @n_initial_objects
completely_restored ||= @n_restored_objects == n_expected_objects
try_report_progress
end
Fiber.yield
end
Expand Down Expand Up @@ -211,6 +188,30 @@ def on_finish
super
end

def create_dumper
dumper = Drndump::DumpClient.new(dumper_params)
dumper.on_finish = lambda do
ensure_completely_restored do
on_finish
logger.trace("start: finish")
end
end
dumper.on_progress = lambda do |message|
logger.trace("dump progress",
:message => message)
end
dumper.on_error = lambda do |error|
if error.is_a?(Exception)
logger.exception("unexpected exception while dump",
error)
else
logger.error("unexpected error while dump",
:error => error)
end
end
dumper
end

def dumper_params
{
:host => source_host,
Expand All @@ -231,15 +232,87 @@ def dump_options
}
end

def try_report_progress(options={})
now = Time.now
elapsed_seconds = (now - @previous_report_time).to_i
if elapsed_seconds >= progress_interval_seconds
if options[:count_restored_objects]
count_total_n_objects do |count|
@previous_report_time = Time.now
@n_restored_objects = count - @n_initial_objects
try_report_progress
end
else
@previous_report_time = now
report_progress
end
end
end

def report_progress
message = "#{@dumper.progress_percentage}% done " +
"(maybe #{@dumper.formatted_remaining_time} remaining)"
message = "#{progress_percentage}% done " +
"(maybe #{formatted_remaining_time} remaining)"
forward("#{prefix}.progress",
"nProcessedMessages" => @dumper.n_received_messages,
"percentage" => @dumper.progress_percentage,
"nRestoredObjects" => @n_restored_objects,
"percentage" => progress_percentage,
"message" => message)
end

MIN_REPORTED_THROUGHPUT = 0.01

def recent_throughput
now = Time.now
n_objects = @n_restored_objects - @previous_n_restored_objects

if now - @previous_measure_time < 1
now = @previous_measure_time
n_objects = @previous_n_restored_objects
else
@previous_measure_time = now
@previous_n_restored_objects = n_objects.to_f
end

if now == @measure_start_time
actual_throughput = 0
else
elapsed_seconds = now - @measure_start_time
actual_throughput = n_objects / elapsed_seconds
end

[actual_throughput, MIN_REPORTED_THROUGHPUT].max
end

def n_remaining_objects
[@dumper.n_forecasted_messages - @n_restored_objects, 0].max
end

def remaining_seconds
throughput = [recent_throughput, messages_per_second].min
remaining_seconds = n_remaining_objects.to_f / throughput
@dumper.remaining_seconds + remaining_seconds
end

ONE_MINUTE_IN_SECONDS = 60
ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60

def formatted_remaining_time
seconds = remaining_seconds
hours = (seconds / ONE_HOUR_IN_SECONDS).floor
seconds -= hours * ONE_HOUR_IN_SECONDS
minutes = (seconds / ONE_MINUTE_IN_SECONDS).floor
seconds -= minutes * ONE_MINUTE_IN_SECONDS
sprintf("%02i:%02i:%02i", hours, minutes, seconds)
end

def progress_percentage
return 0 if @dumper.n_forecasted_messages.zero?
processed = @dumper.n_received_messages + @n_restored_objects
expected = @dumper.n_forecasted_messages * 2
progress = processed / expected
[(progress * 100).to_i, 100].min
end

def myself
@myself ||= NodeName.parse(my_node_name)
end
Expand Down

0 comments on commit 8e0a473

Please sign in to comment.