Skip to content
Browse files

added prefetch option to agent so as to avoid slamming agents process…

…ing long running tasks
  • Loading branch information...
1 parent 8dd09d3 commit 439649c86348175dc0f6e7ec233f933af7987af4 Cullen King committed
Showing with 10 additions and 1 deletion.
  1. +3 −0 bin/nanite-agent
  2. +6 −0 lib/nanite/agent.rb
  3. +1 −1 lib/nanite/config.rb
View
3 bin/nanite-agent
@@ -42,6 +42,9 @@ opts = OptionParser.new do |opts|
options[:threadpool_size] = tps
end
+ opts.on("--prefetch COUNT", Integer, "The number of messages stuffed into the queue at anytime. Set this to a value of 1 or so for longer running jobs (1 or more seconds), so the agent does not get overwhelmed. Default is unlimited.") do |pref|
+ options[:prefetch] = pref
+ end
end
opts.parse!
View
6 lib/nanite/agent.rb
@@ -55,6 +55,9 @@ class Agent
# services : list of services provided by this agent, by default
# all methods exposed by actors are listed
#
+ # prefetch : Sets prefetch (only supported in RabbitMQ >= 1.6). Use value of 1 for long
+ # running jobs (greater than a second) to avoid slamming/stalling your agent.
+ #
# single_threaded: Run all operations in one thread
#
# threadpool_size: Number of threads to run operations in
@@ -208,6 +211,9 @@ def tag(*tags)
end
def setup_queue
+ if amq.respond_to?(:prefetch) && @options.has_key?(:prefetch)
+ amq.prefetch(@options[:prefetch])
+ end
amq.queue(identity, :durable => true).subscribe(:ack => true) do |info, msg|
begin
info.ack
View
2 lib/nanite/config.rb
@@ -31,7 +31,7 @@ def setup_mapper_options(opts, options)
opts.on("--offline-failsafe", "Store messages in an offline queue when all the nanites are offline. Messages will be redelivered when nanites come online. Can be overriden on a per-message basis using the request methods.") do
options[:offline_failsafe] = true
end
-
+
opts.on("--redis HOST_PORT", "Use redis as the agent state storage in the mapper: --redis 127.0.0.1:6379; missing host and/or port will be filled with defaults if colon is present") do |redis|
redishost, redisport = redis.split(':')
redishost = '127.0.0.1' if (redishost.nil? || redishost.empty?)

0 comments on commit 439649c

Please sign in to comment.
Something went wrong with that request. Please try again.