Navigation Menu

Skip to content

Commit

Permalink
Isolate Droonga::DataAbsorber from external commands
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 10, 2015
1 parent c63a1f0 commit 71eb380
Showing 1 changed file with 52 additions and 103 deletions.
155 changes: 52 additions & 103 deletions lib/droonga/data_absorber.rb
Expand Up @@ -13,8 +13,6 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "open3"

require "droonga/loggable"
require "droonga/client"
require "droonga/catalog_generator"
Expand Down Expand Up @@ -43,18 +41,14 @@ def absorb(params)

attr_reader :params
attr_reader :dataset, :port, :tag, :messages_per_second
attr_reader :source_host, :destination_host, :receiver_host, :receiver_port
attr_reader :source_host, :destination_host
attr_reader :error_message

def initialize(params)
@params = params

@messages_per_second = @params[:messages_per_second] || DEFAULT_MESSAGES_PER_SECOND

@drndump = @params[:drndump] || "drndump"
# We should use droonga-send instead of droonga-request,
# because droonga-request is too slow.
@client = @params[:client] || "droonga-send"

@dataset = @params[:dataset] || CatalogGenerator::DEFAULT_DATASET
@port = @params[:port] || CatalogGenerator::DEFAULT_PORT
@tag = @params[:tag] || CatalogGenerator::DEFAULT_TAG
Expand All @@ -65,58 +59,75 @@ def initialize(params)

@receiver_port = @params[:receiver_port]

@destination_client_options = @params[:client_options] || {}

@error_message = nil

#XXX We must instantiate the number of total soruce records before absorbing,
# because parallel commands while doing "dump" can be timed out.
@required_time_in_seconds = calculate_required_time_in_seconds
@total_n_source_records = count_total_n_source_records
end

MESSAGES_PER_SECOND_MATCHER = /(\d+(\.\d+)?) messages\/second/

def absorb
drndump_command_line = [@drndump] + drndump_options
client_command_line = [@client] + client_options(@client)

start_time_in_seconds = Time.new.to_i
env = {}
Open3.pipeline_r([env, *drndump_command_line],
[env, *client_command_line]) do |last_stdout, thread|
last_stdout.each do |output|
if block_given?
messages_per_second = nil
if output =~ MESSAGES_PER_SECOND_MATCHER
messages_per_second = $1.to_f
def run
n_absorbers = 0

absorb_message = {
"type" => "system.absorb-data",
"body" => {
"host" => @source_host,
"port" => @port,
"tag" => @tag,
"dataset" => @dataset,
"messagesPerSecond" => @messages_per_second,
},
}
destination_client.subscribe(absorb_message) do |message|
case message
when Droonga::Client::Error
destination_client.close
@error_message = message.to_s
else
case message["type"]
when "system.absorb-data.result", "system.absorb-data.error"
if message["statusCode"] != 200
client.close
error = message["body"]
@error_message = "#{error['name']}: #{error['message']}"
end
yield(:progress => report_progress(start_time_in_seconds),
:output => output,
:messages_per_second => messages_per_second)
when "system.absorb-data.progress"
@n_prosessed_messages = message["body"]["count"]
yield(:n_processed_messages => @n_processed_messages,
:percentage => progress_percentage,
:message => progress_message)
when "system.absorb-data.start"
n_absorbers += 1
when "system.absorb-data.end"
n_absorbers -= 1
client.close if n_absorbers <= 0
end
end
end
end

def can_report_remaining_time?
@required_time_in_seconds != Droonga::DataAbsorber::TIME_UNKNOWN and
@required_time_in_seconds > 0
end

ONE_MINUTE_IN_SECONDS = 60
ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60

def report_progress(start_time_in_seconds)
return nil unless can_report_remaining_time?
def progress_percentage
progress = @n_prosessed_messages / @total_n_source_records
[(progress * 100).to_i, 100].min
end

elapsed_time = Time.new.to_i - start_time_in_seconds
progress = elapsed_time.to_f / @required_time_in_seconds
progress = [(progress * 100).to_i, 100].min
def progress_message
n_remaining_records = [@total_n_source_records - @n_prosessed_messages, 0].max

remaining_seconds = [@required_time_in_seconds - elapsed_time, 0].max
remaining_seconds = n_remaining_records / @messages_per_second
remaining_hours = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor
remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS
remaining_minutes = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor
remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS
remaining_time = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds)

"#{progress}% done (maybe #{remaining_time} remaining)"
"#{progress_percentage}% done (maybe #{remaining_time} remaining)"
end

def source_client
Expand All @@ -139,7 +150,7 @@ def destination_client
:progocol => :droonga,
:receiver_host => @receiver_host,
:receiver_port => 0,
}
}.merge(@destination_client_options)
@destination_client ||= Droonga::Client.new(options)
end

Expand All @@ -148,68 +159,6 @@ def source_node_suspendable?
end

private
def calculate_required_time_in_seconds
if @client.include?("droonga-send")
total_n_source_records / @messages_per_second
else
TIME_UNKNOWN
end
end

def drndump_options
options = []
options += ["--host", @source_host] if @source_host
options += ["--port", @port]
options += ["--tag", @tag]
options += ["--dataset", @dataset]
options += ["--receiver-host", @receiver_host]
options += ["--receiver-port", @receiver_port] if @receiver_port
options.collect(&:to_s)
end

def droonga_request_options
options = []
options += ["--host", @destination_host]
options += ["--port", @port]
options += ["--tag", @tag]
options += ["--receiver-host", @receiver_host]
options += ["--receiver-port", @receiver_port] if @receiver_port
options.collect(&:to_s)
end

def droonga_send_options
options = []

#XXX Don't use round-robin with multiple endpoints
# even if there are too much data.
# Schema and indexes must be sent to just one endpoint
# to keep their order, but currently there is no way to
# extract only schema and indexes via drndump.
# So, we always use just one endpoint for now,
# even if there are too much data.
server = "droonga:#{params[:destination_host]}"
server = "#{server}:#{params[:port].to_s}"
server = "#{server}/#{params[:tag].to_s}"
options += ["--server", server]

#XXX We should restrict the traffic to avoid overflowing!
options += ["--messages-per-second", @messages_per_second]

options += ["--report-throughput"]

options.collect(&:to_s)
end

def client_options(client)
if client.include?("droonga-request")
droonga_request_options
elsif client.include?("droonga-send")
droonga_send_options
else
raise ArgumentError.new("Unknwon type client: #{client}")
end
end

def source_tables
response = source_client.request("dataset" => @dataset,
"type" => "table_list")
Expand All @@ -225,7 +174,7 @@ def source_tables
end
end

def total_n_source_records
def count_total_n_source_records
queries = {}
source_tables.each do |table|
queries["n_records_of_#{table}"] = {
Expand Down

0 comments on commit 71eb380

Please sign in to comment.