Navigation Menu

Skip to content

Commit

Permalink
Use DumpClient's features to report progress
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 17, 2015
1 parent 3a9a366 commit 44d9c3c
Showing 1 changed file with 10 additions and 124 deletions.
134 changes: 10 additions & 124 deletions lib/droonga/plugins/system/absorb_data.rb
Expand Up @@ -48,37 +48,30 @@ def start

@dumper_error_message = nil

dumper = Drndump::DumpClient.new(dumper_params)
dumper.on_finish = lambda do
@dumper = Drndump::DumpClient.new(dumper_params)
@dumper.on_finish = lambda do
on_finish
logger.trace("start: finish")
end
dumper.on_progress = lambda do |message|
@dumper.on_progress = lambda do |message|
logger.trace("dump progress",
:message => message)
end
dumper.on_error = lambda do |error|
@dumper.on_error = lambda do |error|
logger.error("unexpected error while dump",
:error => error)
end

@previous_report_time = Time.now

begin
@n_processed_messages = 0
@total_n_source_records = nil
get_total_n_source_records do |count|
@total_n_source_records = count
logger.info("#{count} records to be absorbed")
end
logger.info("starting to absorb the source dataset")
@dumper_error_message = dumper.run(dump_options) do |message|
@dumper_error_message = @dumper.run(dump_options) do |message|
begin
message["dataset"] = current_dataset
@messenger.forward(message,
"to" => my_node_name,
"type" => message["type"])
@n_processed_messages += 1
now = Time.now
elapsed_seconds = (now - @previous_report_time).to_i
if elapsed_seconds >= progress_interval_seconds
Expand Down Expand Up @@ -116,7 +109,6 @@ def on_finish
if @dumper_error_message
error(error_name, @dumper_error_message)
else
@total_n_source_records = @n_processed_messages
report_progress
end
rescue Exception => exception
Expand Down Expand Up @@ -147,40 +139,12 @@ def dump_options
end

def report_progress
if @n_processed_messages.nil? or @total_n_source_records.nil?
return
end
message = "#{@dumper.progress_percentage}% done " +
"(maybe #{@dumper.formatted_remaining_time} remaining)"
forward("#{prefix}.progress",
"nProcessedMessages" => @n_processed_messages,
"percentage" => progress_percentage,
"message" => progress_message)
end

def progress_percentage
if @total_n_source_records.nil? or @total_n_source_records.zero?
return 0
end
progress = @n_processed_messages.to_f / @total_n_source_records
[(progress * 100).to_i, 100].min
end

ONE_MINUTE_IN_SECONDS = 60
ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60

def progress_message
n_remaining_records = [@total_n_source_records - @n_processed_messages, 0].max

remaining_seconds = n_remaining_records / messages_per_second
remaining_hours = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor
remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS
remaining_minutes = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor
remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS
remaining_time = sprintf("%02i:%02i:%02i",
remaining_hours,
remaining_minutes,
remaining_seconds)

"#{progress_percentage}% done (maybe #{remaining_time} remaining)"
"nProcessedMessages" => @dumper.n_received_messages,
"percentage" => @dumper.progress_percentage,
"message" => message)
end

def myself
Expand Down Expand Up @@ -230,84 +194,6 @@ def messages_per_second
DEFAULT_MESSAGES_PER_SECOND
end

def source_client_options
{
:host => source_host,
:port => source_port,
:tag => source_tag,
:dataset => source_dataset,

:protocol => :droonga,

:receiver_host => myself.host,
:receiver_port => 0,

:backend => :coolio,
:loop => @loop,
}
end

def create_source_client
Droonga::Client.new(source_client_options)
end

def get_source_tables(&block)
client = create_source_client
client.request("dataset" => source_dataset,
"type" => "table_list") do |response|
client.close
unless response
raise EmptyResponse.new("table_list returns nil response")
end
unless response["body"]
raise EmptyBody.new("table_list returns nil result")
end

message_body = response["body"]
body = message_body[1]
tables = body[1..-1]
table_names = tables.collect do |table|
table[1]
end
yield(table_names)
end
end

def get_total_n_source_records(&block)
get_source_tables do |source_tables|
queries = {}
source_tables.each do |table|
queries["n_records_of_#{table}"] = {
"source" => table,
"output" => {
"elements" => ["count"],
},
}
end
client = create_source_client
client.request("dataset" => source_dataset,
"type" => "search",
"body" => {
"timeout" => 10,
"queries" => queries,
}) do |response|
client.close
unless response
raise EmptyResponse.new("search returns nil response")
end
unless response["body"]
raise EmptyBody.new("search returns nil result")
end

n_records = 0
response["body"].each do |query_name, result|
n_records += result["count"]
end
yield(n_records)
end
end
end

def log_tag
"[#{Process.ppid}] data-absorber"
end
Expand Down

0 comments on commit 44d9c3c

Please sign in to comment.