Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Mapper#push nows also accepts :offline_failsafe option

Signed-off-by: ezmobius <ez@engineyard.com>
  • Loading branch information...
commit 5152fa97c9fe9044251e303277c0027e87b9dce7 1 parent 4dc21db
Ian Leitch authored ezmobius committed
View
2  TODO
@@ -17,7 +17,7 @@ Ian:
- Look into using EM deferables for actors dispatch.
- Integration specs that spawn a small cluster of nanites
- Rename Ping to Status
- - We can now add offline failsafe support to push now that timeout has gone
+ - request/push should take *args for payload?
Maybe:
- Make mapper queue durable and Results respect :persistent flag on the request
View
2  lib/nanite/agent.rb
@@ -128,7 +128,7 @@ def receive(packet)
when Advertise
log.debug("handling Advertise: #{packet}")
advertise_services
- when Request
+ when Request, Push
log.debug("handling Request: #{packet}")
dispatcher.dispatch(packet)
end
View
16 lib/nanite/dispatcher.rb
@@ -11,18 +11,18 @@ def initialize(amq, registry, serializer, identity, log, options)
@options = options
end
- def dispatch(request)
+ def dispatch(deliverable)
result = begin
- act_upon(request)
+ act_upon(deliverable)
rescue Exception => e
error = "#{e.class.name}: #{e.message}\n #{e.backtrace.join("\n ")}"
log.error(error)
error
end
- if request.reply_to
- result = Result.new(request.token, request.reply_to, result, identity)
- amq.queue(request.reply_to, :no_declare => options[:secure]).publish(serializer.dump(result))
+ if deliverable.kind_of?(Request)
+ result = Result.new(deliverable.token, deliverable.reply_to, result, identity)
+ amq.queue(deliverable.reply_to, :no_declare => options[:secure]).publish(serializer.dump(result))
end
result
@@ -30,10 +30,10 @@ def dispatch(request)
private
- def act_upon(request)
- prefix, meth = request.type.split('/')[1..-1]
+ def act_upon(deliverable)
+ prefix, meth = deliverable.type.split('/')[1..-1]
actor = registry.actor_for(prefix)
- actor.send(meth, request.payload)
+ actor.send(meth, deliverable.payload)
end
end
end
View
47 lib/nanite/mapper.rb
@@ -122,7 +122,7 @@ def initialize(options)
#
# @api :public:
def request(type, payload = '', opts = {}, &blk)
- request = build_request(type, payload, opts)
+ request = build_deliverable(Request, type, payload, opts)
request.reply_to = identity
targets = cluster.targets_for(request)
if !targets.empty?
@@ -132,6 +132,8 @@ def request(type, payload = '', opts = {}, &blk)
elsif opts.key?(:offline_failsafe) ? opts[:offline_failsafe] : options[:offline_failsafe]
cluster.publish(request, 'mapper-offline')
:offline
+ else
+ false
end
end
@@ -147,25 +149,36 @@ def request(type, payload = '', opts = {}, &blk)
# :all:: Send the request to all nanites which respond to the service.
# :random:: Randomly pick a nanite.
# :rr: Select a nanite according to round robin ordering.
+ # :offline_failsafe<Boolean>:: Store messages in an offline queue when all
+ # the nanites are offline. Messages will be redelivered when nanites come online.
+ # Default is false unless the mapper was started with the --offline-failsafe flag.
# :persistent<Boolean>:: Instructs the AMQP broker to save the message to persistent
# storage so that it isnt lost when the broker is restarted.
# Default is false unless the mapper was started with the --persistent flag.
#
# @api :public:
def push(type, payload = '', opts = {})
- request = build_request(type, payload, opts)
- cluster.route(request, cluster.targets_for(request))
- true
+ push = build_deliverable(Push, type, payload, opts)
+ targets = cluster.targets_for(push)
+ if !targets.empty?
+ cluster.route(push, targets)
+ true
+ elsif opts.key?(:offline_failsafe) ? opts[:offline_failsafe] : options[:offline_failsafe]
+ cluster.publish(push, 'mapper-offline')
+ :offline
+ else
+ false
+ end
end
private
- def build_request(type, payload, opts)
- request = Request.new(type, payload, opts)
- request.from = identity
- request.token = Identity.generate
- request.persistent = opts.key?(:persistent) ? opts[:persistent] : options[:persistent]
- request
+ def build_deliverable(deliverable_type, type, payload, opts)
+ deliverable = deliverable_type.new(type, payload, opts)
+ deliverable.from = identity
+ deliverable.token = Identity.generate
+ deliverable.persistent = opts.key?(:persistent) ? opts[:persistent] : options[:persistent]
+ deliverable
end
def setup_queues
@@ -175,14 +188,16 @@ def setup_queues
def setup_offline_queue
offline_queue = amq.queue('mapper-offline', :durable => true)
- offline_queue.subscribe(:ack => true) do |info, request|
- request = serializer.load(request)
- request.reply_to = identity
- targets = cluster.targets_for(request)
+ offline_queue.subscribe(:ack => true) do |info, deliverable|
+ deliverable = serializer.load(deliverable)
+ targets = cluster.targets_for(deliverable)
unless targets.empty?
info.ack
- job = job_warden.new_job(request, targets)
- cluster.route(request, job.targets)
+ if deliverable.kind_of?(Request)
+ deliverable.reply_to = identity
+ job_warden.new_job(deliverable, targets)
+ end
+ cluster.route(deliverable, targets)
end
end
View
32 lib/nanite/packets.rb
@@ -89,6 +89,38 @@ def self.json_create(o)
end
end
+ # packet that means a work push from mapper
+ # to actor node
+ #
+ # type is a service name
+ # payload is arbitrary data that is transferred from mapper to actor
+ #
+ # Options:
+ # from is sender identity
+ # token is a generated request id that mapper uses to identify replies
+ # selector is the selector used to route the request
+ # target is the target nanite for the request
+ # persistent signifies if this request should be saved to persistent storage by the AMQP broker
+ class Push < Packet
+ attr_accessor :from, :payload, :type, :token, :selector, :target, :persistent
+ DEFAULT_OPTIONS = {:selector => :least_loaded}
+ def initialize(type, payload, opts={})
+ opts = DEFAULT_OPTIONS.merge(opts)
+ @type = type
+ @payload = payload
+ @from = opts[:from]
+ @token = opts[:token]
+ @selector = opts[:selector]
+ @target = opts[:target]
+ @persistent = opts[:persistent]
+ end
+ def self.json_create(o)
+ i = o['data']
+ new(i['type'], i['payload'], {:from => i['from'], :token => i['token'], :selector => i['selector'],
+ :target => i['target'], :persistent => i['persistent']})
+ end
+ end
+
# packet that means a work result notification sent from actor to mapper
#
# from is sender identity
Please sign in to comment.
Something went wrong with that request. Please try again.