Skip to content
This repository
Browse code

Mapper#push nows also accepts :offline_failsafe option

Signed-off-by: ezmobius <ez@engineyard.com>
  • Loading branch information...
commit 18a5213003f51c745dfe9d2cb8390472d60d1d68 1 parent ac05a0a
Ian Leitch authored ezmobius committed
2  TODO
@@ -17,7 +17,7 @@ Ian:
17 17 - Look into using EM deferables for actors dispatch.
18 18 - Integration specs that spawn a small cluster of nanites
19 19 - Rename Ping to Status
20   - - We can now add offline failsafe support to push now that timeout has gone
  20 + - request/push should take *args for payload?
21 21
22 22 Maybe:
23 23 - Make mapper queue durable and Results respect :persistent flag on the request
2  lib/nanite/agent.rb
@@ -128,7 +128,7 @@ def receive(packet)
128 128 when Advertise
129 129 log.debug("handling Advertise: #{packet}")
130 130 advertise_services
131   - when Request
  131 + when Request, Push
132 132 log.debug("handling Request: #{packet}")
133 133 dispatcher.dispatch(packet)
134 134 end
16 lib/nanite/dispatcher.rb
@@ -11,18 +11,18 @@ def initialize(amq, registry, serializer, identity, log, options)
11 11 @options = options
12 12 end
13 13
14   - def dispatch(request)
  14 + def dispatch(deliverable)
15 15 result = begin
16   - act_upon(request)
  16 + act_upon(deliverable)
17 17 rescue Exception => e
18 18 error = "#{e.class.name}: #{e.message}\n #{e.backtrace.join("\n ")}"
19 19 log.error(error)
20 20 error
21 21 end
22 22
23   - if request.reply_to
24   - result = Result.new(request.token, request.reply_to, result, identity)
25   - amq.queue(request.reply_to, :no_declare => options[:secure]).publish(serializer.dump(result))
  23 + if deliverable.kind_of?(Request)
  24 + result = Result.new(deliverable.token, deliverable.reply_to, result, identity)
  25 + amq.queue(deliverable.reply_to, :no_declare => options[:secure]).publish(serializer.dump(result))
26 26 end
27 27
28 28 result
@@ -30,10 +30,10 @@ def dispatch(request)
30 30
31 31 private
32 32
33   - def act_upon(request)
34   - prefix, meth = request.type.split('/')[1..-1]
  33 + def act_upon(deliverable)
  34 + prefix, meth = deliverable.type.split('/')[1..-1]
35 35 actor = registry.actor_for(prefix)
36   - actor.send(meth, request.payload)
  36 + actor.send(meth, deliverable.payload)
37 37 end
38 38 end
39 39 end
47 lib/nanite/mapper.rb
@@ -122,7 +122,7 @@ def initialize(options)
122 122 #
123 123 # @api :public:
124 124 def request(type, payload = '', opts = {}, &blk)
125   - request = build_request(type, payload, opts)
  125 + request = build_deliverable(Request, type, payload, opts)
126 126 request.reply_to = identity
127 127 targets = cluster.targets_for(request)
128 128 if !targets.empty?
@@ -132,6 +132,8 @@ def request(type, payload = '', opts = {}, &blk)
132 132 elsif opts.key?(:offline_failsafe) ? opts[:offline_failsafe] : options[:offline_failsafe]
133 133 cluster.publish(request, 'mapper-offline')
134 134 :offline
  135 + else
  136 + false
135 137 end
136 138 end
137 139
@@ -147,25 +149,36 @@ def request(type, payload = '', opts = {}, &blk)
147 149 # :all:: Send the request to all nanites which respond to the service.
148 150 # :random:: Randomly pick a nanite.
149 151 # :rr: Select a nanite according to round robin ordering.
  152 + # :offline_failsafe<Boolean>:: Store messages in an offline queue when all
  153 + # the nanites are offline. Messages will be redelivered when nanites come online.
  154 + # Default is false unless the mapper was started with the --offline-failsafe flag.
150 155 # :persistent<Boolean>:: Instructs the AMQP broker to save the message to persistent
151 156 # storage so that it isnt lost when the broker is restarted.
152 157 # Default is false unless the mapper was started with the --persistent flag.
153 158 #
154 159 # @api :public:
155 160 def push(type, payload = '', opts = {})
156   - request = build_request(type, payload, opts)
157   - cluster.route(request, cluster.targets_for(request))
158   - true
  161 + push = build_deliverable(Push, type, payload, opts)
  162 + targets = cluster.targets_for(push)
  163 + if !targets.empty?
  164 + cluster.route(push, targets)
  165 + true
  166 + elsif opts.key?(:offline_failsafe) ? opts[:offline_failsafe] : options[:offline_failsafe]
  167 + cluster.publish(push, 'mapper-offline')
  168 + :offline
  169 + else
  170 + false
  171 + end
159 172 end
160 173
161 174 private
162 175
163   - def build_request(type, payload, opts)
164   - request = Request.new(type, payload, opts)
165   - request.from = identity
166   - request.token = Identity.generate
167   - request.persistent = opts.key?(:persistent) ? opts[:persistent] : options[:persistent]
168   - request
  176 + def build_deliverable(deliverable_type, type, payload, opts)
  177 + deliverable = deliverable_type.new(type, payload, opts)
  178 + deliverable.from = identity
  179 + deliverable.token = Identity.generate
  180 + deliverable.persistent = opts.key?(:persistent) ? opts[:persistent] : options[:persistent]
  181 + deliverable
169 182 end
170 183
171 184 def setup_queues
@@ -175,14 +188,16 @@ def setup_queues
175 188
176 189 def setup_offline_queue
177 190 offline_queue = amq.queue('mapper-offline', :durable => true)
178   - offline_queue.subscribe(:ack => true) do |info, request|
179   - request = serializer.load(request)
180   - request.reply_to = identity
181   - targets = cluster.targets_for(request)
  191 + offline_queue.subscribe(:ack => true) do |info, deliverable|
  192 + deliverable = serializer.load(deliverable)
  193 + targets = cluster.targets_for(deliverable)
182 194 unless targets.empty?
183 195 info.ack
184   - job = job_warden.new_job(request, targets)
185   - cluster.route(request, job.targets)
  196 + if deliverable.kind_of?(Request)
  197 + deliverable.reply_to = identity
  198 + job_warden.new_job(deliverable, targets)
  199 + end
  200 + cluster.route(deliverable, targets)
186 201 end
187 202 end
188 203
32 lib/nanite/packets.rb
@@ -89,6 +89,38 @@ def self.json_create(o)
89 89 end
90 90 end
91 91
  92 + # packet that means a work push from mapper
  93 + # to actor node
  94 + #
  95 + # type is a service name
  96 + # payload is arbitrary data that is transferred from mapper to actor
  97 + #
  98 + # Options:
  99 + # from is sender identity
  100 + # token is a generated request id that mapper uses to identify replies
  101 + # selector is the selector used to route the request
  102 + # target is the target nanite for the request
  103 + # persistent signifies if this request should be saved to persistent storage by the AMQP broker
  104 + class Push < Packet
  105 + attr_accessor :from, :payload, :type, :token, :selector, :target, :persistent
  106 + DEFAULT_OPTIONS = {:selector => :least_loaded}
  107 + def initialize(type, payload, opts={})
  108 + opts = DEFAULT_OPTIONS.merge(opts)
  109 + @type = type
  110 + @payload = payload
  111 + @from = opts[:from]
  112 + @token = opts[:token]
  113 + @selector = opts[:selector]
  114 + @target = opts[:target]
  115 + @persistent = opts[:persistent]
  116 + end
  117 + def self.json_create(o)
  118 + i = o['data']
  119 + new(i['type'], i['payload'], {:from => i['from'], :token => i['token'], :selector => i['selector'],
  120 + :target => i['target'], :persistent => i['persistent']})
  121 + end
  122 + end
  123 +
92 124 # packet that means a work result notification sent from actor to mapper
93 125 #
94 126 # from is sender identity

0 comments on commit 18a5213

Please sign in to comment.
Something went wrong with that request. Please try again.