Navigation Menu

Skip to content

Commit

Permalink
Use Droonga::DataAbsorber directly
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 10, 2015
1 parent 71eb380 commit 4e7c622
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 67 deletions.
58 changes: 24 additions & 34 deletions bin/droonga-engine-absorb-data
Expand Up @@ -17,8 +17,8 @@

require "ostruct"
require "optparse"
require "open3"
require "socket"
require "coolio"

require "droonga/engine/version"
require "droonga/catalog_generator"
Expand All @@ -31,6 +31,8 @@ require "droonga/restarter"

class AbsorbDataCommand
def run
@loop = Coolio::Loop.default

parse_options
assert_valid_options
trap_signals
Expand All @@ -44,10 +46,10 @@ class AbsorbDataCommand
puts ""
puts "Absorbing..."

absorb
succeeded = absorb

puts "Done."
exit(true)
puts "Done." if succeeded
exit(succeeded)
end

private
Expand Down Expand Up @@ -136,13 +138,6 @@ class AbsorbDataCommand
serf.send_query(command, options)
end

def current_member_state(target)
serf = Droonga::Serf.new(target, :verbose => @options.verbose)
serf.current_members.find do |member|
member["name"] == target
end
end

def absorber
@absorber ||= prepare_absorber
end
Expand All @@ -156,37 +151,31 @@ class AbsorbDataCommand
:port => @options.port,
:tag => @options.tag,
:messages_per_second => @options.messages_per_second,
:client_options => {
:backend => :coolio,
:loop => @loop,
},
}
Droonga::DataAbsorber.new(absorber_options)
end

def absorb
start_time_in_seconds = Time.new.to_i
run_remote_command(destination_node, "absorb_data",
"node" => destination_node,
"source" => source_node,
"port" => @options.port,
"tag" => @options.tag,
"dataset" => @options.dataset,
"messages_per_second" => @options.messages_per_second)
last_progress = ""
while true
sleep(3)
state = current_member_state(destination_node)
if state.nil? or state["tags"]["absorbing"] != "true"
break
last_progress = nil
absorber.run do |progress|
if last_progress
printf("%s", "#{" " * last_progress[:message].size}\r")
end
printf("%s", "#{progress[:message]}\r")
last_progress = progress
end
@loop.run

progress = absorber.report_progress(start_time_in_seconds)
if progress
printf("%s", "#{" " * last_progress.size}\r")
printf("%s", "#{progress}\r")
last_progress = progress
end
if absorber.error_message
puts(absorber.error_message)
do_cancel
return false
end
printf("%s", "#{" " * last_progress.size}\r")
printf("%s", "#{progress}\r")
puts "100% done"

puts ""

response = run_remote_command(source_node, "report_metadata",
Expand All @@ -200,6 +189,7 @@ class AbsorbDataCommand
"node" => destination_node,
"timestamp" => timestamp)
end
true
end

def trap_signals
Expand Down
61 changes: 28 additions & 33 deletions bin/droonga-engine-join
Expand Up @@ -19,6 +19,7 @@ require "slop"
require "json"
require "pathname"
require "socket"
require "coolio"

require "droonga/engine/version"
require "droonga/path"
Expand All @@ -32,6 +33,8 @@ require "droonga/node_metadata"

class JoinCommand
def run
@loop = Coolio::Loop.default

parse_options
trap_signals

Expand All @@ -47,7 +50,13 @@ class JoinCommand
do_join
register_to_existing_nodes
set_source_node_role
copy_data unless @options["no-copy"]
unless @options["no-copy"]
successed = copy_data
unless successed
do_cancel
exit(false)
end
end
set_effective_message_timestamp
reset_source_node_role
reset_joining_node_role
Expand Down Expand Up @@ -141,13 +150,6 @@ class JoinCommand
serf.send_query(command, options)
end

def current_member_state(target)
serf = Droonga::Serf.new(target, :verbose => @options[:verbose])
serf.current_members.find do |member|
member["name"] == target
end
end

def absorber
@absorber ||= prepare_absorber
end
Expand All @@ -161,6 +163,10 @@ class JoinCommand
:port => @options[:port],
:tag => @options[:tag],
:messages_per_second => @options["records-per-second"],
:client_options => {
:backend => :coolio,
:loop => @loop,
},
}
Droonga::DataAbsorber.new(absorber_options)
end
Expand Down Expand Up @@ -218,33 +224,22 @@ class JoinCommand
def copy_data
puts("Copying data from the source node...")

@start_time_in_seconds = Time.new.to_i

run_remote_command(joining_node, "absorb_data",
"node" => joining_node,
"source" => source_node,
"port" => absorber.port,
"tag" => absorber.tag,
"dataset" => absorber.dataset,
"messages_per_second" => absorber.messages_per_second)

last_progress = ""
while true
sleep(3)
state = current_member_state(joining_node)
if state.nil? or state["tags"]["absorbing"] != "true"
break
end
progress = absorber.report_progress(@start_time_in_seconds)
if progress
printf("%s", "#{" " * last_progress.size}\r")
printf("%s", "#{progress}\r")
last_progress = progress
last_progress = nil
absorber.run do |progress|
if last_progress
printf("%s", "#{" " * last_progress[:message].size}\r")
end
printf("%s", "#{progress[:message]}\r")
last_progress = progress
end
printf("%s", "#{" " * last_progress.size}\r")
printf("%s", "#{progress}\r")
puts "100% done"
@loop.run

if absorber.error_message
puts(absorber.error_message)
do_cancel
return false
end

puts ""
end

Expand Down

0 comments on commit 4e7c622

Please sign in to comment.