Skip to content
Browse files

Catch exceptions in AMQP subscription so it doesn't kill EM

  • Loading branch information...
1 parent c542f6b commit 41be60abaed544da02d1ed7de6bc230d36585c59 @MarkMenard committed Aug 2, 2011
Showing with 9 additions and 6 deletions.
  1. +0 −1 examples/producer_example_2.rb
  2. +9 −5 lib/consumption_junction/em_runner.rb
View
1 examples/producer_example_2.rb
@@ -1,6 +1,5 @@
require 'rubygems'
require 'carrot'
-require 'celluloid'
MESSAGE_COUNT = 20_000
QUEUE_NAME = "example.queue.2"
View
14 lib/consumption_junction/em_runner.rb
@@ -32,14 +32,18 @@ def create_subscriptions
queue_channel = AMQP::Channel.new(amqp_connection)
queue = queue_channel.queue(worker_config.queue, :durable => true)
- queue.subscribe(:ack => worker_config.ack) do |metadata, payload|
+ queue.subscribe(:ack => worker_config.ack) do |header, payload|
operation = lambda do
- message_processor_supervisor.actor.process_message(payload)
+ begin
+ message_processor_supervisor.actor.process_message(payload)
+ rescue Exception => e
+ header.reject :requeue => true
+ end
end
callback = lambda do |result|
- metadata.ack
+ header.ack
end
EventMachine.defer(operation, callback)
@@ -63,7 +67,7 @@ def worker_configs
# queue_channel = AMQP::Channel.new(connection)
# queue = queue_channel.queue(QUEUE_NAME, :auto_delete => true)
#
-# queue.subscribe(:ack => true) do |metadata, payload|
+# queue.subscribe(:ack => true) do |header, payload|
#
# operation = lambda do
# sleep_time = rand(10) > 5 ? 5 : 0
@@ -74,7 +78,7 @@ def worker_configs
#
# callback = lambda do |result|
# puts "Acking for listener #{i}"
-# metadata.ack
+# header.ack
# if counter.count == MESSAGE_COUNT
# EventMachine.stop { exit }
# end

0 comments on commit 41be60a

Please sign in to comment.
Something went wrong with that request. Please try again.