Permalink
Browse files

Make one server object able to serve multiple services

There previously was a 1-1 mapping between server and service objects.
In order to have better introspection to what services are being
exposed, we now keep track of services per server. This allows to set up
a server that automatically broadcasts all its services via a locator.
Also, both server and client objects are namespaced, and respond to/call
services in that specific namespace (instead of specifying a namespace
per-server or per-client).

Change-Id: Ib24e9c53a74c3eeb44b67f69b1f8067c5aa553d0
  • Loading branch information...
1 parent dcc2e5a commit eeb50680d9cb5206b1f222e5a0975cf7d322908a @pietern pietern committed Oct 25, 2011
@@ -19,18 +19,20 @@ def multiply(request)
require "nats/rpc/client"
EM.run do
- nats = NATS.connect
logger = Logger.new(STDOUT)
logger.level = Logger::DEBUG
- client = NATS::RPC::Client.new(nats, MathService.new, :logger => logger)
+
+ nats = NATS.connect
+ client = NATS::RPC::Client.new(nats, :logger => logger)
+ math_client = client.service(MathService.new)
# Find out which peers are available
- client.mcall("ping") do |request, reply|
+ math_client.mcall("ping") do |request, reply|
# Don't wait around for future replies
request.stop!
# Call the peer that sent the PONG
- client.call(reply.peer_id, "multiply", 10) do |request, reply|
+ math_client.call(reply.peer_id, "multiply", 10) do |request, reply|
puts "#{reply.peer_id} got #{reply.result} by randomly multiplying 10!"
end
end
@@ -41,10 +43,12 @@ def multiply(request)
require "nats/rpc/server"
EM.run do
- nats = NATS.connect
logger = Logger.new(STDOUT)
logger.level = Logger::DEBUG
- server = NATS::RPC::Server.new(nats, MathService.new, :logger => logger)
+
+ nats = NATS.connect
+ server = NATS::RPC::Server.new(nats, :logger => logger)
+ server.start(MathService.new)
end
else
@@ -5,45 +5,61 @@ module NATS
module RPC
class Client < Peer
- def call(peer_id, method, payload = nil, options = {}, &blk)
- request = Call.new(self, method, payload)
- request.peer_id = peer_id
- request.timeout = options[:timeout] if options.has_key?(:timeout)
- request.shortcut!(&blk) if blk
- request
+ def generate_message_id
+ @message_id ||= 0
+ @message_id += 1
end
- def mcall(method, payload = nil, options = {}, &blk)
- request = Mcall.new(self, method, payload)
- request.timeout = options[:timeout] if options.has_key?(:timeout)
- request.shortcut!(&blk) if blk
- request
+ def service(service)
+ ServiceClient.new(self, service)
end
- def mcast(method, payload = nil, options = {})
- request = Mcast.new(self, method, payload)
- request
- end
+ class ServiceClient
- def generate_message_id
- @message_id ||= 0
- @message_id += 1
+ attr_reader :client
+ attr_reader :service
+
+ def initialize(client, service)
+ @client = client
+ @service = service
+ end
+
+ def call(peer_id, method, payload = nil, options = {}, &blk)
+ Call.new(@client, @service, method, payload).tap do |request|
+ request.peer_id = peer_id
+ request.timeout = options[:timeout] if options.has_key?(:timeout)
+ request.shortcut!(&blk) if blk
+ end
+ end
+
+ def mcall(method, payload = nil, options = {}, &blk)
+ Mcall.new(@client, @service, method, payload).tap do |request|
+ request.timeout = options[:timeout] if options.has_key?(:timeout)
+ request.shortcut!(&blk) if blk
+ end
+ end
+
+ def mcast(method, payload = nil, options = {})
+ Mcast.new(@client, @service, method, payload)
+ end
end
class Request
include Util::EventEmitter
attr_reader :client
+ attr_reader :service
attr_reader :message_id
attr_reader :method
attr_reader :payload
- def initialize(client, method, payload)
+ def initialize(client, service, method, payload)
@client = client
+ @service = service
@message_id = client.generate_message_id
- @method = client.service.class.methods[method.to_s]
+ @method = service.class.methods[method.to_s]
@payload = payload
# The service should export the specified method
@@ -102,7 +118,7 @@ def post_initialize
super
# Generate inbox that recipients of this request can reply to
- @inbox = [client.base_subject, "inbox", client.peer_id, message_id].join(".")
+ @inbox = [client.base_subject, service.name, "inbox", client.peer_id, message_id].join(".")
@subscription = nil
# Setup default timeout, user can override
@@ -204,21 +220,21 @@ def post_initialize
def execute!
start
- client.publish([client.base_subject, "call", peer_id].join("."), message)
+ client.publish([client.base_subject, service.name, "call", peer_id].join("."), message)
end
end
class Mcall < ExpectReplyRequest
def execute!
start
- client.publish([client.base_subject, "mcall"].join("."), message)
+ client.publish([client.base_subject, service.name, "mcall"].join("."), message)
end
end
class Mcast < Request
def execute!
start
- client.publish([client.base_subject, "mcast"].join("."), message)
+ client.publish([client.base_subject, service.name, "mcast"].join("."), message)
end
end
@@ -10,19 +10,12 @@ def self.generate_peer_id
end
attr_reader :nats
- attr_reader :service
attr_reader :options
attr_reader :logger
attr_reader :namespace
- def initialize(nats, service, options = {})
- service_base_class = NATS::RPC::Service
- unless service.kind_of?(service_base_class)
- raise ArgumentError.new("Expected subclass of " + service_base_class.name)
- end
-
+ def initialize(nats, options = {})
@nats = nats
- @service = service
@options = options
@logger = options[:logger]
@namespace = options[:namespace] || "default"
@@ -47,7 +40,7 @@ def peer_id
# Base subject for all calls.
def base_subject
- @base_subject ||= "rpc.#{namespace}.#{service.name}"
+ @base_subject ||= "rpc.#{namespace}"
end
# Proxy to NATS.
@@ -1,32 +1,74 @@
require "nats/rpc/peer"
+require "nats/rpc/util/event_emitter"
module NATS
module RPC
class Server < Peer
+ include Util::EventEmitter
+
def post_initialize
- subscribe(base_subject + ".call.#{peer_id}") do |message|
- handle(message)
- end
- subscribe(base_subject + ".mcall") do |message|
- handle(message)
- end
- subscribe(base_subject + ".mcast") do |message|
- handle(message)
+ @services = {}
+ end
+
+ def services
+ @services.dup
+ end
+
+ def start(service)
+ service_base_class = NATS::RPC::Service
+ unless service.kind_of?(service_base_class)
+ raise ArgumentError.new("Expected subclass of " + service_base_class.name)
end
+
+ subscribe_service(service)
+ emit("start", service)
+ @services[service.name] = service
+
+ nil
end
- def handle(message)
- request = Request.new(self, message)
- request.execute!
+ protected
+
+ def subscribe_service(service)
+ handler = lambda { |message| Request.execute!(self, service, message) }
+ subscribe(call_subject(service), &handler)
+ subscribe(mcall_subject(service), &handler)
+ subscribe(mcast_subject(service), &handler)
+ end
+
+ def unsubscribe_service(service)
+ unsubscribe(call_subject(service))
+ unsubscribe(mcall_subject(service))
+ unsubscribe(mcast_subject(service))
+ end
+
+ def call_subject(service)
+ [base_subject, service.name, "call", peer_id].join(".")
+ end
+
+ def mcall_subject(service)
+ [base_subject, service.name, "mcall"].join(".")
+ end
+
+ def mcast_subject(service)
+ [base_subject, service.name, "mcast"].join(".")
end
class Request
+ def self.execute!(server, service, message)
+ Request.new(server, service, message).tap do |request|
+ request.execute!
+ end
+ end
+
attr_reader :server
+ attr_reader :service
- def initialize(server, message)
+ def initialize(server, service, message)
@server = server
+ @service = service
@message = message
end
@@ -51,7 +93,7 @@ def payload
end
def execute!
- server.service.execute!(self)
+ service.execute!(self)
rescue Service::Error => error
reply_error(error)
end
@@ -41,11 +41,13 @@ def sink(request)
include_context :nats
def start_server
- NATS::RPC::Server.new(nats, ClientSpecService.new, :peer_name => "server")
+ NATS::RPC::Server.new(nats, :peer_name => "server").tap do |server|
+ server.start(ClientSpecService.new)
+ end
end
let(:client) do
- NATS::RPC::Client.new(nats, ClientSpecService.new, :peer_name => "client")
+ NATS::RPC::Client.new(nats, :peer_name => "client").service(ClientSpecService.new)
end
context "call" do
@@ -349,8 +351,8 @@ def start_server
# There is no reply. Wait for a bit and test that the request has been
# received by the remotes.
::EM.add_timer(0.1) do
- server1.service.sinked.should have(1).request
- server2.service.sinked.should have(1).request
+ server1.services["ClientSpecService"].sinked.should have(1).request
+ server2.services["ClientSpecService"].sinked.should have(1).request
done
end
end
@@ -36,11 +36,13 @@ def delayed_invalid_error(request)
include_context :nats
def start_server
- NATS::RPC::Server.new(nats, ServerSpecService.new, :peer_name => "server")
+ NATS::RPC::Server.new(nats, :peer_name => "server").tap do |server|
+ server.start(ServerSpecService.new)
+ end
end
let(:client) do
- NATS::RPC::Client.new(nats, ServerSpecService.new, :peer_name => "client")
+ NATS::RPC::Client.new(nats, :peer_name => "client").service(ServerSpecService.new)
end
context "replying with an error" do

0 comments on commit eeb5068

Please sign in to comment.