Navigation Menu

Skip to content

Commit

Permalink
Generate messages for progress by the handler of system.absorb-data
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 10, 2015
1 parent 83da224 commit 4347bb0
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 101 deletions.
95 changes: 5 additions & 90 deletions lib/droonga/data_absorber.rb
Expand Up @@ -22,12 +22,6 @@ module Droonga
class DataAbsorber
include Loggable

class EmptyResponse < StandardError
end

class EmptyBody < StandardError
end

DEFAULT_MESSAGES_PER_SECOND = 100

TIME_UNKNOWN = -1
Expand Down Expand Up @@ -59,13 +53,9 @@ def initialize(params)

@receiver_port = @params[:receiver_port]

@destination_client_options = @params[:client_options] || {}
@client_options = @params[:client_options] || {}

@error_message = nil

#XXX We must instantiate the number of total soruce records before absorbing,
# because parallel commands while doing "dump" can be timed out.
@total_n_source_records = count_total_n_source_records
end

def run
Expand Down Expand Up @@ -96,10 +86,9 @@ def run
end
when "system.absorb-data.progress"
body = message["body"]
@n_prosessed_messages = body["nProcessedMessages"]
yield(:n_processed_messages => @n_processed_messages,
:percentage => progress_percentage,
:message => progress_message)
yield(:n_processed_messages => body["nProcessedMessages"],
:percentage => body["percentage"],
:message => body["message"])
when "system.absorb-data.start"
n_absorbers += 1
when "system.absorb-data.end"
Expand All @@ -110,39 +99,6 @@ def run
end
end

ONE_MINUTE_IN_SECONDS = 60
ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60

def progress_percentage
progress = @n_prosessed_messages / @total_n_source_records
[(progress * 100).to_i, 100].min
end

def progress_message
n_remaining_records = [@total_n_source_records - @n_prosessed_messages, 0].max

remaining_seconds = n_remaining_records / @messages_per_second
remaining_hours = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor
remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS
remaining_minutes = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor
remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS
remaining_time = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds)

"#{progress_percentage}% done (maybe #{remaining_time} remaining)"
end

def source_client
options = {
:host => @source_host,
:port => @port,
:tag => @tag,
:protocol => :droonga,
:receiver_host => @receiver_host,
:receiver_port => 0,
}
@source_client ||= Droonga::Client.new(options)
end

def destination_client
options = {
:host => @destination_host,
Expand All @@ -151,7 +107,7 @@ def destination_client
:protocol => :droonga,
:receiver_host => @receiver_host,
:receiver_port => 0,
}.merge(@destination_client_options)
}.merge(@client_options)
@destination_client ||= Droonga::Client.new(options)
end

Expand All @@ -160,47 +116,6 @@ def source_node_suspendable?
end

private
def source_tables
response = source_client.request("dataset" => @dataset,
"type" => "table_list")

raise EmptyResponse.new("table_list") unless response
raise EmptyBody.new("table_list") unless response["body"]

message_body = response["body"]
body = message_body[1]
tables = body[1..-1]
tables.collect do |table|
table[1]
end
end

def count_total_n_source_records
queries = {}
source_tables.each do |table|
queries["n_records_of_#{table}"] = {
"source" => table,
"output" => {
"elements" => ["count"],
},
}
end
response = source_client.request("dataset" => @dataset,
"type" => "search",
"body" => {
"queries" => queries,
})

raise EmptyResponse.new("search") unless response
raise EmptyBody.new("search") unless response["body"]

n_records = 0
response["body"].each do |query_name, result|
n_records += result["count"]
end
n_records
end

def source_replica_hosts
@source_replica_hosts ||= get_source_replica_hosts
end
Expand Down
133 changes: 122 additions & 11 deletions lib/droonga/plugins/system/absorb_data.rb
Expand Up @@ -36,6 +36,12 @@ def initialize
end

class DataAbsorber < AsyncCommand::AsyncHandler
class EmptyResponse < StandardError
end

class EmptyBody < StandardError
end

private
def prefix
"system.absorb-data"
Expand All @@ -55,16 +61,21 @@ def handle
serf = Serf.new(my_node_name)
serf.set_tag("absorbing", true)

count = 0
begin
@total_n_source_records = count_total_n_source_records
@n_processed_messages = 0
dumper_error_message = dumper.run do |message|
@messenger.forward(message,
"to" => my_node_name,
"type" => message["type"])
count += 1
report_progress(count)
@n_processed_messages += 1
report_progress
end

forward("#{prefix}.progress", "count" => count)
report_progress
rescue Exception => exception
dumper_error_message = exception.to_s
end

serf.set_tag("absorbing", true)

Expand All @@ -76,10 +87,10 @@ def handle
def dumper_params
params = @request.request
{
:host => params["host"],
:port => params["port"] || NodeName::DEFAULT_PORT,
:tag => params["tag"] || NodeName::DEFAULT_TAG,
:dataset => params["dataset"] || Catalog::Dataset::DEFAULT_NAME,
:host => source_host,
:port => source_port,
:tag => source_tag,
:dataset => source_dataset,

:receiver_host => myself.host,
:receiver_port => 0,
Expand All @@ -88,9 +99,33 @@ def dumper_params
}
end

def report_progress(count)
return unless (count % 100).zero?
forward("#{prefix}.progress", "count" => count)
def report_progress
return unless (@n_processed_messages % 100).zero?
forward("#{prefix}.progress",
"nProcessedMessages" => @n_processed_messages,
"percentage" => progress_percentage,
"message" => progress_message)
end

def progress_percentage
progress = @n_prosessed_messages / @total_n_source_records
[(progress * 100).to_i, 100].min
end

ONE_MINUTE_IN_SECONDS = 60
ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60

def progress_message
n_remaining_records = [@total_n_source_records - @n_prosessed_messages, 0].max

remaining_seconds = n_remaining_records / @messages_per_second
remaining_hours = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor
remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS
remaining_minutes = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor
remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS
remaining_time = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds)

"#{progress_percentage}% done (maybe #{remaining_time} remaining)"
end

def myself
Expand All @@ -101,6 +136,82 @@ def my_node_name
@messenger.engine_state.name
end

def source_host
@source_host ||= @request.request["host"]
end

def source_port
@source_port ||= @request.request["port"] || NodeName::DEFAULT_PORT
end

def source_tag
@source_tag ||= @request.request["tag"] || NodeName::DEFAULT_TAG
end

def source_dataset
@source_dataset ||= @request.request["dataset"] || Catalog::Dataset::DEFAULT_NAME
end

def source_tables
response = source_client.request("dataset" => @dataset,
"type" => "table_list")

raise EmptyResponse.new("table_list") unless response
raise EmptyBody.new("table_list") unless response["body"]

message_body = response["body"]
body = message_body[1]
tables = body[1..-1]
tables.collect do |table|
table[1]
end
end

def source_client_options
params = @request.request
options = {
:host => source_host,
:port => source_port,
:tag => source_tag,
:dataset => source_dataset,

:protocol => :droonga,

:receiver_host => myself.host,
:receiver_port => 0,
}
end

def source_client
@source_client ||= Droonga::Client.new(source_client_options)
end

def count_total_n_source_records
queries = {}
source_tables.each do |table|
queries["n_records_of_#{table}"] = {
"source" => table,
"output" => {
"elements" => ["count"],
},
}
end
response = source_client.request("dataset" => @dataset,
"type" => "search",
"body" => {
"queries" => queries,
})

raise EmptyResponse.new("search") unless response
raise EmptyBody.new("search") unless response["body"]

n_records = 0
response["body"].each do |query_name, result|
n_records += result["count"]
end
n_records
end

def log_tag
"[#{Process.ppid}] data-absorber"
end
Expand Down

0 comments on commit 4347bb0

Please sign in to comment.