Permalink
Browse files

Allow more than 1 client or server instance per service

This change adds a per-process unique integer to the subject of
client inboxes, and to the call subject for servers.

Change-Id: Ib0654d326f628fce485d70bb1d10d66d41b70187
  • Loading branch information...
1 parent 05defc9 commit 8ded15a1e563d1e08322527525720d810445995d @pietern pietern committed Oct 21, 2011
View
10 nats-rpc/README.md
@@ -97,14 +97,14 @@ it merely calls the implemented methods when it receives requests to do so.
```ruby
EM.run do
nats = NATS.connect
- server = NATS::RPC::Server.new(nats, MyService.new, :peername => "my_unique_id")
+ server = NATS::RPC::Server.new(nats, MyService.new, :peer_name => "my_unique_name")
end
```
The peer name that is used by default is equal to the combination of the
hostname of the server and the PID of the Ruby process. When multiple services
are exported by creating multiple `NATS::RPC::Server` instances, it is
-generally a good idea to have them use the *same peer name. This allows a
+generally a good idea to have them use the same peer name. This allows a
caller of *ServiceA* to use that peer name to call a remote method on
*ServiceB*.
@@ -116,7 +116,7 @@ instance of the service that it intends to call.
```ruby
EM.run do
nats = NATS.connect
- client = NATS::RPC::Client.new(nats, MyService.new)
+ client = NATS::RPC::Client.new(nats, MyService.new, :peer_name => "my_unique_name")
end
```
@@ -134,7 +134,7 @@ received, or the request times out. Typical usage looks like the following
snippet:
```ruby
-request = client.call("remote-peer", "my_method", "hello!", :timeout => 1)
+request = client.call("remote_peer_id", "my_method", "hello!", :timeout => 1)
request.execute!
request.on("reply") do |reply|
@@ -184,7 +184,7 @@ back to the caller. It is raised when the `#result` method on the reply object
is called. This can be avoided using a standard `begin`/`rescue` block.
```ruby
-request = client.call("remote-peer", "my_error_method")
+request = client.call("remote_peer_id", "my_error_method")
request.execute!
request.on("reply") do |reply|
View
4 nats-rpc/examples/call.rb
@@ -30,8 +30,8 @@ def multiply(request)
request.unregister
# Call the peer that sent the PONG
- client.call(reply.peername, "multiply", 10) do |request, reply|
- puts "#{reply.peername} got #{reply.result} by randomly multiplying 10!"
+ client.call(reply.peer_id, "multiply", 10) do |request, reply|
+ puts "#{reply.peer_id} got #{reply.result} by randomly multiplying 10!"
end
end
end
View
18 nats-rpc/lib/nats/rpc/client.rb
@@ -6,7 +6,7 @@ module RPC
class Client < Peer
def post_initialize
- subscribe("rpc.inbox.#{peername}") do |message|
+ subscribe(base_subject + ".inbox.#{peer_id}") do |message|
request = @registry[message["message_id"]]
if request
@@ -33,9 +33,9 @@ def registered?(request)
@registry.has_key?(request.message_id)
end
- def call(peer, method, payload = nil, options = {}, &blk)
+ def call(peer_id, method, payload = nil, options = {}, &blk)
request = Call.new(self, method, payload, options)
- request.peer = peer
+ request.peer_id = peer_id
request.shortcut!(&blk) if blk
request
end
@@ -94,7 +94,7 @@ def message_id
# Construct minimal message for this request.
def message
{ "message_id" => message_id,
- "peername" => client.peername,
+ "peer_id" => client.peer_id,
"method" => @method.name,
"payload" => @payload }
end
@@ -201,11 +201,11 @@ def stop_timer
class Call < ExpectReplyRequest
- attr_accessor :peer
+ attr_accessor :peer_id
def execute!
prepare_execute
- client.publish(generate_subject("call", peer), message)
+ client.publish(generate_subject("call", peer_id), message)
# Unregister after receiving the first reply.
on("reply") {
@@ -235,7 +235,7 @@ class Reply
attr_reader :message_id
# Meta
- attr_reader :peername
+ attr_reader :peer_id
def initialize(request, message)
@request = request
@@ -246,8 +246,8 @@ def message_id
@message["message_id"]
end
- def peername
- @message["peername"]
+ def peer_id
+ @message["peer_id"]
end
def result
View
22 nats-rpc/lib/nats/rpc/peer.rb
@@ -4,6 +4,11 @@ module NATS
module RPC
class Peer
+ def self.generate_peer_id
+ @peer_id ||= 0
+ @peer_id += 1
+ end
+
attr_reader :nats
attr_reader :service
attr_reader :options
@@ -26,13 +31,16 @@ def initialize(nats, service, options = {})
# Placeholder
def post_initialize; end
- # TODO: use something that scales better...
- #
- # This approach has problems when the process forks after setting up RPC
- # code. However, since forking in the reactor loop is a bad practice,
- # this shouldn't be a problem.
- def peername
- options[:peername] ||= "%s-%d" % [`hostname`.chomp, $?.pid]
+ # Return peer identification. This can either be user-provided by means
+ # of the options hash, or otherwise defaults to a combination of the
+ # hostname and the PID.
+ def peer_name
+ options[:peer_name] ||= "%s-%d" % [`hostname`.chomp, $?.pid]
+ end
+
+ # Return ID specific to this peer and particular object instance.
+ def peer_id
+ @peer_id ||= [peer_name, self.class.generate_peer_id].join(".")
end
# Base subject for all calls.
View
10 nats-rpc/lib/nats/rpc/server.rb
@@ -5,7 +5,7 @@ module RPC
class Server < Peer
def post_initialize
- subscribe(base_subject + ".call.#{peername}") do |message|
+ subscribe(base_subject + ".call.#{peer_id}") do |message|
handle(message)
end
subscribe(base_subject + ".mcall") do |message|
@@ -34,8 +34,8 @@ def message_id
@message["message_id"]
end
- def peername
- @message["peername"]
+ def peer_id
+ @message["peer_id"]
end
def method
@@ -68,9 +68,9 @@ def reply_error(error)
protected
def _reply(message)
- server.publish("rpc.inbox.#{peername}", message.merge({
+ server.publish(server.base_subject + ".inbox.#{peer_id}", message.merge({
"message_id" => message_id,
- "peername" => server.peername
+ "peer_id" => server.peer_id
}))
end
end
View
68 nats-rpc/spec/client_spec.rb
@@ -11,6 +11,12 @@ def echo(request)
request.reply(request.payload)
end
+ export :echo_twice
+ def echo_twice(request)
+ request.reply(request.payload)
+ request.reply(request.payload)
+ end
+
export :error
def error(request)
raise ClientSpecError.new
@@ -34,25 +40,24 @@ def sink(request)
describe NATS::RPC::Client do
include_context :nats
- def start_server(name = "server")
- NATS::RPC::Server.new(nats, ClientSpecService.new, :peername => name)
+ def start_server
+ NATS::RPC::Server.new(nats, ClientSpecService.new, :peer_name => "server")
end
- def client(name = "client")
- @clients ||= {}
- @clients[name] = NATS::RPC::Client.new(nats, ClientSpecService.new, :peername => name)
+ let(:client) do
+ NATS::RPC::Client.new(nats, ClientSpecService.new, :peer_name => "client")
end
context "call" do
it "should be received and processed by a single remote" do
em do
- server1 = start_server("server1")
- server2 = start_server("server2")
+ server1 = start_server
+ server2 = start_server
- request = client.call("server1", "echo", "Hi there!")
+ request = client.call(server1.peer_id, "echo", "Hi there!")
request.on("reply") do |reply|
reply.result.should eq("Hi there!")
- reply.peername.should eq("server1")
+ reply.peer_id.should eq(server1.peer_id)
done
end
@@ -62,9 +67,9 @@ def client(name = "client")
it "should raise errors triggered on the remote" do
em do
- start_server
+ server = start_server
- request = client.call("server", "error")
+ request = client.call(server.peer_id, "error")
request.execute!
request.on("reply") do |reply|
@@ -78,11 +83,10 @@ def client(name = "client")
it "should only emit a reply once" do
em do
- # These two servers will both reply to the regular call
- server1 = start_server("server")
- server2 = start_server("server")
+ server = start_server
- request = client.call("server", "echo", "Hi there!")
+ # This call emits two replies
+ request = client.call(server.peer_id, "echo_twice", "Hi there!")
request.execute!
request.should be_registered
@@ -101,9 +105,9 @@ def client(name = "client")
it "should take a block for setting up a shortcut" do
em do
- start_server
+ server = start_server
- client.call("server", "echo", "Hi there!") do |request, reply|
+ client.call(server.peer_id, "echo", "Hi there!") do |request, reply|
reply.should_not == nil
reply.result.should == "Hi there!"
done
@@ -113,9 +117,9 @@ def client(name = "client")
it "should use default timeout when available" do
em do
- start_server
+ server = start_server
- request = client.call("server", "timeout", nil)
+ request = client.call(server.peer_id, "timeout", nil)
request.execute!
start = Time.now
@@ -134,9 +138,9 @@ def client(name = "client")
it "should pass a nil reply to the shortcut block on a timeout" do
em do
- start_server
+ server = start_server
- client.call("server", "timeout", nil) do |request, reply|
+ client.call(server.peer_id, "timeout", nil) do |request, reply|
reply.should be_nil
done
end
@@ -145,9 +149,9 @@ def client(name = "client")
it "should allow caller to override default timeout" do
em do
- start_server
+ server = start_server
- request = client.call("server", "timeout", nil, :timeout => 0.01)
+ request = client.call(server.peer_id, "timeout", nil, :timeout => 0.01)
request.execute!
start = Time.now
@@ -166,9 +170,9 @@ def client(name = "client")
it "should not fire timeout when a reply is received" do
em do
- start_server
+ server = start_server
- request = client.call("server", "timeout", nil, :timeout => 0.2)
+ request = client.call(server.peer_id, "timeout", nil, :timeout => 0.2)
request.execute!
replies = []
@@ -193,8 +197,8 @@ def client(name = "client")
context "mcall" do
it "should be received and processed by all remotes" do
em do
- server1 = start_server("server1")
- server2 = start_server("server2")
+ server1 = start_server
+ server2 = start_server
request = client.mcall("echo", "Hi there!")
request.execute!
@@ -207,16 +211,16 @@ def client(name = "client")
::EM.add_timer(0.05) do
replies.should have(2).replies
replies.map(&:result).should == (["Hi there!"] * 2)
- replies.map(&:peername).sort.should == ["server1", "server2"]
+ replies.map(&:peer_id).sort.should == [server1.peer_id, server2.peer_id].sort
done
end
end
end
it "should allow the user to prevent future replies from arriving" do
em do
- server1 = start_server("server1")
- server2 = start_server("server2")
+ server1 = start_server
+ server2 = start_server
request = client.mcall("echo", "Hi there!")
request.execute!
@@ -338,8 +342,8 @@ def client(name = "client")
context "mcast" do
it "should be received and processed by all remotes" do
em do
- server1 = start_server("server1")
- server2 = start_server("server2")
+ server1 = start_server
+ server2 = start_server
request = client.mcast("sink")
request.execute!
View
29 nats-rpc/spec/server_spec.rb
@@ -35,21 +35,21 @@ def delayed_invalid_error(request)
describe NATS::RPC::Server do
include_context :nats
- def start_server(name = "server")
- NATS::RPC::Server.new(nats, ServerSpecService.new, :peername => name)
+ def start_server
+ NATS::RPC::Server.new(nats, ServerSpecService.new, :peer_name => "server")
end
- def client(name = "client")
- @clients ||= {}
- @clients[name] = NATS::RPC::Client.new(nats, ServerSpecService.new, :peername => name)
+ let(:client) do
+ NATS::RPC::Client.new(nats, ServerSpecService.new, :peer_name => "client")
end
context "replying with an error" do
context "that is raised" do
it "should work when derived from Service::Error" do
em do
- start_server
- client.call("server", "error") do |request, reply|
+ server = start_server
+
+ client.call(server.peer_id, "error") do |request, reply|
lambda {
reply.result
}.should raise_error(ServerSpecError)
@@ -63,8 +63,9 @@ def client(name = "client")
begin
em do
- start_server
- request = client.call("server", "invalid_error")
+ server = start_server
+
+ request = client.call(server.peer_id, "invalid_error")
request.execute!
end
rescue => aux
@@ -80,8 +81,9 @@ def client(name = "client")
context "that is explicitly passed to the request object" do
it "should work when derived from Service::Error" do
em do
- start_server
- client.call("server", "delayed_error") do |request, reply|
+ server = start_server
+
+ client.call(server.peer_id, "delayed_error") do |request, reply|
lambda {
reply.result
}.should raise_error(ServerSpecError)
@@ -95,8 +97,9 @@ def client(name = "client")
begin
em do
- start_server
- request = client.call("server", "delayed_invalid_error")
+ server = start_server
+
+ request = client.call(server.peer_id, "delayed_invalid_error")
request.execute!
end
rescue => aux

0 comments on commit 8ded15a

Please sign in to comment.