Skip to content

Commit

Permalink
Use pipeline_r to execute external commands with pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jul 23, 2014
1 parent 4385140 commit 79df420
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 19 deletions.
4 changes: 3 additions & 1 deletion bin/droonga-engine-absorb-data
Expand Up @@ -101,7 +101,9 @@ else
:source_host => options.source_host,
:destination_host => options.destination_host,
:port => options.port,
:tag => options.tag)
:tag => options.tag) do |output|
puts output
end
end
puts "Done."

Expand Down
12 changes: 2 additions & 10 deletions lib/droonga/command/serf_event_handler.rb
Expand Up @@ -158,15 +158,11 @@ def join_as_replica
end
sleep(1) # wait for restart

count = 0
DataAbsorber.absorb(:dataset => dataset_name,
:source_host => source_host,
:destination_host => host,
:port => port,
:tag => tag) do |dump|
count += 1
end
log("#{count} dump messages are successfully processed")
:tag => tag)
sleep(1)
end

Expand Down Expand Up @@ -311,15 +307,11 @@ def absorb_data
log("port = #{port}")
log("tag = #{tag}")

count = 0
DataAbsorber.absorb(:dataset => dataset_name,
:source_host => source,
:destination_host => host,
:port => port,
:tag => tag) do |dump|
count += 1
end
log("#{count} dump messages are successfully processed")
:tag => tag)
end

def live_nodes
Expand Down
13 changes: 5 additions & 8 deletions lib/droonga/data_absorber.rb
Expand Up @@ -39,14 +39,11 @@ def absorb(params)
drndump_command_line = [drndump] + drndump_options
client_command_line = [client] + client_options

Open3.popen3(*drndump_command_line, :pgroup => true) do |dump_in, dump_out, dump_error, dump_thread|
dump_in.close
Open3.popen3(*client_command_line, :pgroup => true) do |client_in, client_out, client_error, client_thread|
client_out.close
dump_out.each do |part|
yield part if block_given?
client_in.puts(part)
end
env = {}
Open3.pipeline_r([env, *drndump_command_line],
[env, *client_command_line]) do |last_stdout, thread|
last_stdout.each do |output|
yield output if block_given?
end
end
end
Expand Down

0 comments on commit 79df420

Please sign in to comment.