Permalink
Browse files

Added support for batch message processing. Support ruby 1.8.7. Re-im…

…plement message chainings. Add Message class to segregate meta-data from content.
  • Loading branch information...
1 parent 6cb0fef commit bb50d1f03cc1c2ebee1c2fc78e5a788dcecf0c14 @spiegela spiegela committed Oct 23, 2011
View
@@ -2,17 +2,20 @@ PATH
remote: .
specs:
minion (0.2.0)
+ activesupport (>= 3.1)
amqp (~> 0.7.4)
bunny (~> 0.7.4)
json (>= 1.2.0)
GEM
remote: http://rubygems.org/
specs:
+ activesupport (3.1.1)
+ multi_json (~> 1.0)
amqp (0.7.4)
eventmachine
archive-tar-minitar (0.5.2)
- bunny (0.7.4)
+ bunny (0.7.5)
columnize (0.3.2)
diff-lcs (1.1.2)
eventmachine (0.12.10)
@@ -22,6 +25,7 @@ GEM
ruby_core_source (>= 0.1.4)
mocha (0.9.8)
rake
+ multi_json (1.0.3)
rake (0.8.7)
rspec (2.5.0)
rspec-core (~> 2.5.0)
@@ -1,22 +1,22 @@
-= Minion: super simple job queue over amqp
+# Minion: super simple job queue over amqp #
Minion makes processing jobs over AMQP simple and easy.
-== Setup
+## Setup ##
Minion pulls the AMQP credentials out the environment via AMQP_URL.
-$ export AMQP_URL="amqp://johndoe:abc123@localhost/my_vhost"
+ $ export AMQP_URL="amqp://johndoe:abc123@localhost/my_vhost"
Alternativly you can explicitly set it programmatically like this:
-Minion.amqp_url = "amqp://johndoe:abc123@localhost/my_vhost"
+ Minion.amqp_url = "amqp://johndoe:abc123@localhost/my_vhost"
If no URL is supplied, Minion defaults to "amqp://guest:guest@localhost/" which
is the default credentials for Rabbitmq running locally.
-== Principles
+## Principles ##
Minion treats your jobs with respect. The queues are durable and not
autodelete. When popping jobs off the queue, they will not receive an ack
@@ -32,76 +32,103 @@ eventmachine.
Message processing is done one at a time (prefetch 1). If you want tasks done
in parallel, run two minions.
-== Push a job onto the queue
+## Push a job onto the queue ##
Its easy to push a job onto the queue.
-Minion.enqueue("make.sandwich", { "for" => "me", "with" => "bread" })
+ Minion.enqueue("make.sandwich", { "for" => "me", "with" => "bread" })
Minion expects a queue name (and will create it if needed). The second argument
needs to be a hash.
-== Processing a job
+## Processing a job ##
-require 'minion'
+ require 'minion'
-include Minion
+ include Minion
-job "make.sandwich" do |args|
-Sandwich.make(args["for"],args["with"])
-end
+ job "make.sandwich" do |args|
+ Sandwich.make(args["for"],args["with"])
+ end
-== Chaining multiple steps
+## Chaining multiple steps ##
If you have a task that requires more than one step just pass an array of
queues when you enqueue.
-Minion.enqueue([ "make.sandwich", "eat.sandwich" ], "for" => "me")
+ Minion.enqueue([ "make.sandwich", "eat.sandwich" ], "for" => "me")
-job "make.sandwich" do
-## this return value is merged with for => me and sent to the next queue
-{ "type" => "ham on rye" }
-end
+ job "make.sandwich" do
+ # this return value is merged with for => me and sent to the next queue
+ { "type" => "ham on rye" }
+ end
-job "eat.sandwich" do |args|
-puts "I have #{args["type"]} sandwich for #{args["me"]}"
-end
+ job "eat.sandwich" do |msg|
+ puts "I have #{msg.content["type"]} sandwich for #{msg.content["me"]}"
+ end
-== Conditional Processing
+## Conditional Processing ##
If you want a minion worker to only subscribe to a queue under specific
conditions there is a :when parameter that takes a lambda as an argument. For
example, if you had a queue that makes sandwiches but only if there is bread
on hand, it would be.
-job "make.sandwich", :when => lambda { not Bread.out? } do
-Sandwich.make
-end
+ job "make.sandwich", :when => lambda { not Bread.out? } do
+ Sandwich.make
+ end
-== Error handling
+## Batch Processing ##
+
+If you want a minion worker to subscribe to a queue and batch messages together
+you can use the "batch" options. This will group messages into groups of
+"batch_size" unless there are too few messages available.
+
+ job "make.sandwich", :batch_size => 5 do |msg|
+ Sandwich.make_a_bunch msg.batch
+ end
+
+If you want your worker to wait until the exact batch_size is reached, then tell
+it so:
+
+ job "make.sandwich", :batch_size => 5, :wait => true do |msg|
+ Sandwich.make_a_bunch msg.batch
+ end
+
+That'll wait indefinitely, but maybe you'll give up after a bit. Just tell it
+how many seconds:
+
+ job "make.sandwich", :batch_size => 5, :wait => 5 do |msg|
+ Sandwich.make_a_bunch msg.batch
+ end
+
+This is especially helpful since any short delay can create some weird batch sizes.
+See the examples for more information.
+
+## Error handling ##
When an error is thrown in a job handler, the job is requeued to be done later
and the minion process exits. If you define an error handler, however, the
error handler is run and the job is removed from the queue.
-error do |e|
-puts "got an error! #{e}"
-end
+ error do |e|
+ puts "got an error! #{e}"
+ end
-== Logging
+## Logging ##
Minion logs to stdout via "puts". You can specify a custom logger like this:
-logger do |msg|
-puts msg
-end
+ logger do |msg|
+ puts msg
+ end
-== Testing
+## Testing ##
When running the Minion test suite you will need to have a RabbitMQ instance
running locally with the default admin user "guest" still intact.
-== Meta
+## Meta ##
Created by Orion Henry
View
@@ -0,0 +1,31 @@
+#!/usr/bin/env ruby
+
+#
+# This example illustrates the use of batching
+# messages together. This way work can be
+# distributed in small chunks, but worked on
+# in larger groups
+#
+
+$:.unshift File.dirname(__FILE__) + '/../lib'
+require 'rubygems'
+require 'minion'
+
+include Minion
+
+error do |exception,queue,message,headers|
+ puts "got an error processing queue #{queue}"
+ puts exception.message
+ puts exception.backtrace
+end
+
+logger do |msg|
+ puts "--> #{msg}"
+end
+
+
+job "do.many", :batch_size => 10 do |msg|
+ puts "got #{msg.batch.size} messages"
+end
+
+27.times{ Minion.enqueue 'do.many', {"something" => true} }
View
@@ -0,0 +1,36 @@
+#!/usr/bin/env ruby
+
+#
+# This example illustrates the use batching
+# when we want to wait until the hit an exact
+# number of messages before running
+#
+
+$:.unshift File.dirname(__FILE__) + '/../lib'
+require 'rubygems'
+require 'minion'
+
+include Minion
+
+error do |exception,queue,message,headers|
+ puts "got an error processing queue #{queue}"
+ puts exception.message
+ puts exception.backtrace
+end
+
+logger do |msg|
+ puts "--> #{msg}"
+end
+
+
+job "do.only_in_tens", :batch_size => 10, :wait => true do |msg|
+ puts "got #{msg.batch.size} messages"
+end
+
+# We won't run this, but just so you know, this is the
+# same result as the "batch.rb" example
+job "do.in_tens_or_emtpy", :batch_size => 10, :wait => false do |msg|
+ puts "got #{msg.batch.size} messages"
+end
+
+27.times{ Minion.enqueue 'do.only_in_tens', {"something" => true} }
View
@@ -0,0 +1,52 @@
+#!/usr/bin/env ruby
+
+#
+# This example illustrates the use batching
+# when we want to wait some # of seconds for an
+# exact number of messages before running,
+# but if we don't get that many, we'll go ahead
+# anyways.
+#
+# This prevents some odd batch counts, but allows
+# for flexibility of the queue size
+#
+
+$:.unshift File.dirname(__FILE__) + '/../lib'
+require 'rubygems'
+require 'minion'
+
+include Minion
+
+error do |exception,queue,message,headers|
+ puts "got an error processing queue #{queue}"
+ puts exception.message
+ puts exception.backtrace
+end
+
+logger do |msg|
+ puts "--> #{msg}"
+end
+
+Thread.new do
+ puts "------------------------------------------"
+ puts "First, no waiting with artificial delays"
+ puts "------------------------------------------"
+ 3.times{ Minion.enqueue 'do.no_waiting', {"something" => true} }
+ sleep 0.2
+ 4.times{ Minion.enqueue 'do.no_waiting', {"something" => true} }
+ sleep 5
+ puts "------------------------------------------"
+ puts "Now if we give it a second, all is well"
+ puts "------------------------------------------"
+ 3.times{ Minion.enqueue 'do.in_tens_or_wait_2', {"something" => true} }
+ sleep 0.2
+ 4.times{ Minion.enqueue 'do.in_tens_or_wait_2', {"something" => true} }
+end
+
+job "do.no_waiting", :batch_size => 10 do |msg|
+ puts "got #{msg.batch.size} messages"
+end
+
+job "do.in_tens_or_wait_2", :batch_size => 10, :wait => 2 do |msg|
+ puts "got #{msg.batch.size} messages"
+end
View
@@ -1,5 +1,11 @@
#!/usr/bin/env ruby
+#
+# This example illustrates the use of chaining
+# callbacks to create building blocks with the
+# message content being just an integer
+#
+
$:.unshift File.dirname(__FILE__) + '/../lib'
require 'rubygems'
require 'minion'
@@ -16,21 +22,21 @@
puts "--> #{msg}"
end
-job "math.incr" do |args|
- { "number" => (1 + args["number"].to_i) }
+job "math.incr" do |msg|
+ msg.content.to_i + 1
end
-job "math.double" do |args|
- { "number" => (2 * args["number"].to_i) }
+job "math.double" do |msg|
+ msg.content.to_i * 2
end
-job "math.square" do |args|
- { "number" => (args["number"].to_i * args["number"].to_i) }
+job "math.square" do |msg|
+ msg.content.to_i * msg.content.to_i
end
-job "math.print" do |args|
- puts "NUMBER -----> #{args["number"]}"
+job "math.print" do |msg|
+ puts "NUMBER -----> #{msg.content}"
end
-enqueue([ "math.incr", "math.double", "math.square", "math.incr", "math.double", "math.print" ], { :number => 3 })
+enqueue([ "math.incr", "math.double", "math.square", "math.incr", "math.double", "math.print" ], 3)
Oops, something went wrong.

0 comments on commit bb50d1f

Please sign in to comment.