Skip to content

Commit

Permalink
Use a dead letter queue to handle mapping errors. (elastic#583)
Browse files Browse the repository at this point in the history
* Use a dead letter queue to handle mapping errors.

Also retry policy has been changed. This is mostly motivated from elastic#572 and the use of DLQ.
We now retry everything except for 400 and 429.

Fixes elastic#572
  • Loading branch information
Suyog Rao committed Apr 7, 2017
1 parent c7fcd75 commit 0529411
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
29 changes: 22 additions & 7 deletions lib/logstash/outputs/elasticsearch/common.rb
Expand Up @@ -4,7 +4,7 @@ module LogStash; module Outputs; class ElasticSearch;
module Common
attr_reader :client, :hosts

RETRYABLE_CODES = [429, 503]
DLQ_CODES = [400, 404]
SUCCESS_CODES = [200, 201]
CONFLICT_CODE = 409

Expand All @@ -16,6 +16,8 @@ module Common

def register
@stopping = Concurrent::AtomicBoolean.new(false)
# To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior.
@dlq_writer = respond_to?(:execution_context) ? execution_context.dlq_writer : nil
setup_hosts # properly sets @hosts
build_client
install_template
Expand Down Expand Up @@ -128,16 +130,29 @@ def submit(actions)
action = actions[idx]
action_params = action[1]

# Retry logic: If it is success, we move on. If it is a failure, we have 3 paths:
# - For 409, we log and drop. there is nothing we can do
# - For a mapping error, we send to dead letter queue for a human to intervene at a later point.
# - For everything else there's mastercard. Yep, and we retry indefinitely. This should fix #572 and other transient network issues
if SUCCESS_CODES.include?(status)
next
elsif CONFLICT_CODE == status && VERSION_TYPES_PERMITTING_CONFLICT.include?(action_params[:version_type])
@logger.debug "Ignoring external version conflict: status[#{status}] failure[#{failure}] version[#{action_params[:version]}] version_type[#{action_params[:version_type]}]"
elsif CONFLICT_CODE == status
@logger.warn "Failed action.", status: status, action: action, response: response if !failure_type_logging_whitelist.include?(failure["type"])
next
elsif RETRYABLE_CODES.include?(status)
@logger.info "retrying failed action with response code: #{status} (#{failure})"
elsif DLQ_CODES.include?(status)
action_event = action[2]
# To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior)
if @dlq_writer
# TODO: Change this to send a map with { :status => status, :action => action } in the future
@dlq_writer.write(event, "Could not index event to Elasticsearch. status: #{status}, action: #{action}, response: #{response}")
else
@logger.warn "Could not index event to Elasticsearch.", status: status, action: action, response: response
end
next
else
# only log what the user whitelisted
@logger.info "retrying failed action with response code: #{status} (#{failure})" if !failure_type_logging_whitelist.include?(failure["type"])
actions_to_retry << action
elsif !failure_type_logging_whitelist.include?(failure["type"])
@logger.warn "Failed action.", status: status, action: action, response: response
end
end

Expand Down
20 changes: 12 additions & 8 deletions spec/integration/outputs/retry_spec.rb
Expand Up @@ -88,14 +88,18 @@ def mock_actions_with_response(*resp)
subject.multi_receive([event1, event1, event1, event2])
end

it "should retry actions with response status of 429" do
subject.register
retryable_codes = [429, 502, 503]

mock_actions_with_response({"errors" => true, "statuses" => [429]},
{"errors" => false})
expect(subject).to receive(:submit).with([action1]).twice.and_call_original
retryable_codes.each do |code|
it "should retry actions with response status of #{code}" do
subject.register

subject.multi_receive([event1])
mock_actions_with_response({"errors" => true, "statuses" => [code]},
{"errors" => false})
expect(subject).to receive(:submit).with([action1]).twice.and_call_original

subject.multi_receive([event1])
end
end

it "should retry an event infinitely until a non retryable status occurs" do
Expand All @@ -107,7 +111,7 @@ def mock_actions_with_response(*resp)
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [500]})
{"errors" => true, "statuses" => [400]})

subject.multi_receive([event1])
end
Expand All @@ -126,7 +130,7 @@ def mock_actions_with_response(*resp)
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [500]})
{"errors" => true, "statuses" => [400]})

subject.multi_receive([event1])
end
Expand Down

0 comments on commit 0529411

Please sign in to comment.