Skip to content

Commit

Permalink
Add configurable ack/responses on failure
Browse files Browse the repository at this point in the history
  • Loading branch information
erithmetic committed Sep 2, 2015
1 parent 055ad56 commit e9e3a1f
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 12 deletions.
18 changes: 18 additions & 0 deletions lib/hutch/acknowledgements/nack_on_all_failures.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
require 'hutch/logging'

module Hutch
module Acknowledgements
class NackOnAllFailures
include Logging

def handle(delivery_info, properties, broker, ex)
prefix = "message(#{properties.message_id || '-'}): "
logger.debug "#{prefix} nacking message"

broker.nack delivery_info.delivery_tag

true
end
end
end
end
1 change: 1 addition & 0 deletions lib/hutch/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def self.initialize(params={})
require_paths: [],
autoload_rails: true,
error_handlers: [Hutch::ErrorHandlers::Logger.new],
error_acknowledgements: [],
tracer: Hutch::Tracers::NullTracer,
namespace: nil,
daemonise: false,
Expand Down
15 changes: 14 additions & 1 deletion lib/hutch/worker.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'hutch/message'
require 'hutch/logging'
require 'hutch/broker'
require 'hutch/acknowledgements/nack_on_all_failures'
require 'carrot-top'

module Hutch
Expand Down Expand Up @@ -114,7 +115,7 @@ def handle_message(consumer, delivery_info, properties, payload)
with_tracing(consumer_instance).handle(message)
broker.ack(delivery_info.delivery_tag)
rescue StandardError => ex
broker.nack(delivery_info.delivery_tag)
acknowledge_error(delivery_info, properties, broker, ex)
handle_error(properties.message_id, payload, consumer, ex)
end
end
Expand All @@ -129,11 +130,23 @@ def handle_error(message_id, payload, consumer, ex)
end
end

def acknowledge_error(delivery_info, properties, broker, ex)
acks = error_acknowledgements +
[Hutch::Acknowledgements::NackOnAllFailures.new]
acks.find do |backend|
backend.handle(delivery_info, properties, broker, ex)
end
end

def consumers=(val)
if val.empty?
logger.warn "no consumer loaded, ensure there's no configuration issue"
end
@consumers = val
end

def error_acknowledgements
Hutch::Config[:error_acknowledgements]
end
end
end
58 changes: 47 additions & 11 deletions spec/hutch/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,25 @@
worker.handle_message(consumer, delivery_info, properties, payload)
end

context 'when the consumer requeues a message' do
class Rejecter
include Hutch::Consumer

def process(message)
requeue!
end
end

it 'requeues the message', :focus do
context 'when the consumer fails and a requeue is configured' do

it 'requeues the message' do
allow(consumer_instance).to receive(:process).and_raise('failed')
requeuer = double
allow(requeuer).to receive(:handle).ordered { |delivery_info, properties, broker, e|
broker.requeue delivery_info.delivery_tag
true
}
allow(worker).to receive(:error_acknowledgements).and_return([requeuer])
expect(broker).to_not receive(:ack)
expect(broker).to_not receive(:nack)
expect(broker).to receive(:requeue)

worker.handle_message(Rejecter, delivery_info, properties, payload)
worker.handle_message(consumer, delivery_info, properties, payload)
end
end


context 'when the consumer raises an exception' do
before { allow(consumer_instance).to receive(:process).and_raise('a consumer error') }

Expand Down Expand Up @@ -112,5 +113,40 @@ def process(message)
end
end
end


describe '#acknowledge_error' do
let(:delivery_info) { double('Delivery Info', routing_key: '',
delivery_tag: 'dt') }
let(:properties) { double('Properties', message_id: 'abc123') }

subject { worker.acknowledge_error delivery_info, properties, broker, StandardError.new }

it 'stops when it runs a successful acknowledgement' do
skip_ack = double handle: false
always_ack = double handle: true
never_used = double handle: true

allow(worker).
to receive(:error_acknowledgements).
and_return([skip_ack, always_ack, never_used])

expect(never_used).to_not receive(:handle)

subject
end

it 'defaults to nacking' do
skip_ack = double handle: false

allow(worker).
to receive(:error_acknowledgements).
and_return([skip_ack, skip_ack])

expect(broker).to receive(:nack)

subject
end
end
end

0 comments on commit e9e3a1f

Please sign in to comment.