Skip to content

Commit

Permalink
Don't run RPC in event handler.
Browse files Browse the repository at this point in the history
RPC in event handler breaks output of the event handler itself.
  • Loading branch information
piroor committed Jul 18, 2014
1 parent 2af94eb commit 9315b6c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
37 changes: 30 additions & 7 deletions bin/droonga-engine-join
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,37 @@ end
options.joining_node = "#{options.joining_host}:#{options.port}/#{options.tag}"
options.source_node = "#{options.replica_source_host}:#{options.port}/#{options.tag}"

def run_remote_command(target, command, options)
result = Droonga::Serf.send_query(target, command, options)
puts result[:result]
puts result[:error] unless result[:error].empty?
end


puts "Preparing to fetch cluster information..."
publish_port = 10032 + rand(10000)
run_remote_command(options.source_node, "publish_catalog",
"node" => options.source_node,
"port" => publish_port)
sleep(3) # wait until the HTTP server becomes ready

puts "Joining new replica to the cluster..."
result = Droonga::Serf.send_query(options.joining_node, "join",
"node" => options.joining_node,
"type" => "replica",
"source" => options.source_node,
"copy" => options.copy)
puts result[:result]
puts result[:error] unless result[:error].empty?
run_remote_command(options.joining_node, "join",
"node" => options.joining_node,
"type" => "replica",
"source" => options.source_node,
"fetch_port" => publish_port,
"copy" => options.copy)

puts "Update existing hosts in the cluster..."
run_remote_command(options.source_node, "add_replicas",
"dataset" => dataset_name,
"hosts" => [host])

run_remote_command(options.source_node, "unpublish_catalog",
"node" => options.source_node,
"port" => publish_port)

puts "Done."

exit(true)
31 changes: 3 additions & 28 deletions lib/droonga/command/serf_event_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def join_as_replica

source_host = source_node.split(":").first

catalog = fetch_catalog(source_node)
fetch_port = @payload["fetch_port"]
catalog = fetch_catalog(source_node, fetch_port)
generator = create_current_catalog_generator(catalog)
dataset = generator.dataset_for_host(source_host) ||
generator.dataset_for_host(host)
Expand Down Expand Up @@ -160,24 +161,10 @@ def join_as_replica
modifier.datasets[dataset_name].replicas.hosts += other_hosts
modifier.datasets[dataset_name].replicas.hosts.uniq!
end
sleep(1) # wait for restart

puts "joining to the cluster: update others"

source_node = "#{source_host}:#{port}/#{tag}"
run_remote_command(source_node, "add_replicas",
"dataset" => dataset_name,
"hosts" => [host])
end

def fetch_catalog(source_node)
def fetch_catalog(source_node, port)
source_host = source_node.split(":").first
port = 10032 + rand(10000)

run_remote_command(source_node, "publish_catalog",
"node" => source_node,
"port" => port)
sleep(3) # wait until the HTTP server becomes ready

url = "http://#{source_host}:#{port}"
connection = Faraday.new(url) do |builder|
Expand All @@ -187,10 +174,6 @@ def fetch_catalog(source_node)
response = connection.get("/catalog.json")
catalog = response.body

run_remote_command(source_node, "unpublish_catalog",
"node" => source_node,
"port" => port)

JSON.parse(catalog)
end

Expand Down Expand Up @@ -336,14 +319,6 @@ def save_status(key, value)
status[key] = value
SafeFileWriter.write(Serf.status_file, JSON.pretty_generate(status))
end

def run_remote_command(node, command, options={})
puts "remote command: #{command} on #{node}"
result = Serf.send_query(node, command, options)
puts result[:output]
puts result[:error] unless result[:error].empty?
result
end
end
end
end

0 comments on commit 9315b6c

Please sign in to comment.