Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: batch processing exceptions #276

Merged
merged 5 commits into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def success_handler(self, record: Any, result: Any):
self.success_messages.append(record)
return entry

def failure_handler(self, record: Any, exception: Exception):
def failure_handler(self, record: Any, exception: Tuple):
to-mc marked this conversation as resolved.
Show resolved Hide resolved
"""
Failure callback

Expand All @@ -94,8 +94,9 @@ def failure_handler(self, record: Any, exception: Exception):
tuple
"fail", exceptions args, original record
"""
entry = ("fail", exception.args, record)
logger.debug(f"Record processing exception: {exception}")
exception_string = f"{exception[0]}:{exception[1]}"
entry = ("fail", exception_string, record)
logger.debug(f"Record processing exception: {exception_string}")
self.exceptions.append(exception)
self.fail_messages.append(record)
return entry
Expand Down
18 changes: 18 additions & 0 deletions aws_lambda_powertools/utilities/batch/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
"""
Batch processing exceptions
"""
import traceback


class SQSBatchProcessingError(Exception):
"""When at least one message within a batch could not be processed"""

def __init__(self, msg="", child_exceptions=()):
super().__init__(msg)
self.msg = msg
self.child_exceptions = child_exceptions

# Overriding this method so we can output all child exception tracebacks when we raise this exception to prevent
# errors being lost. See https://github.com/awslabs/aws-lambda-powertools-python/issues/275
def __str__(self):
parent_exception_str = super(SQSBatchProcessingError, self).__str__()
to-mc marked this conversation as resolved.
Show resolved Hide resolved
exception_list = [f"{parent_exception_str}\n"]
for exception in self.child_exceptions:
extype, ex, tb = exception
formatted = "".join(traceback.format_exception(extype, ex, tb))
exception_list.append(formatted)

return "\n".join(exception_list)
15 changes: 10 additions & 5 deletions aws_lambda_powertools/utilities/batch/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Batch SQS utilities
"""
import logging
import sys
from typing import Callable, Dict, List, Optional, Tuple

import boto3
Expand Down Expand Up @@ -90,10 +91,10 @@ def _process_record(self, record) -> Tuple:
An object to be processed.
"""
try:
result = self.handler(record)
return self.success_handler(record, result)
except Exception as exc:
return self.failure_handler(record, exc)
result = self.handler(record=record)
return self.success_handler(record=record, result=result)
except Exception:
return self.failure_handler(record=record, exception=sys.exc_info())

def _prepare(self):
"""
Expand Down Expand Up @@ -123,7 +124,11 @@ def _clean(self):
logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed")
else:
logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception")
raise SQSBatchProcessingError(list(self.exceptions))
raise SQSBatchProcessingError(
msg=f"Not all records processed succesfully. {len(self.exceptions)} individual errors logged "
f"separately below.",
child_exceptions=self.exceptions,
)

return delete_message_response

Expand Down
10 changes: 5 additions & 5 deletions tests/functional/test_utilities_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha
with partial_processor(records, record_handler) as ctx:
ctx.process()

assert len(error.value.args[0]) == 1
assert len(error.value.child_exceptions) == 1
stubber.assert_no_pending_responses()


Expand Down Expand Up @@ -144,7 +144,7 @@ def lambda_handler(event, context):
with pytest.raises(SQSBatchProcessingError) as error:
lambda_handler(event, {})

assert len(error.value.args[0]) == 2
assert len(error.value.child_exceptions) == 2
stubber.assert_no_pending_responses()


Expand All @@ -171,7 +171,7 @@ def lambda_handler(event, context):
with pytest.raises(SQSBatchProcessingError) as error:
lambda_handler(event, {})

assert len(error.value.args[0]) == 1
assert len(error.value.child_exceptions) == 1
stubber.assert_no_pending_responses()


Expand Down Expand Up @@ -203,7 +203,7 @@ def lambda_handler(event, context):

stubber.assert_no_pending_responses()

assert len(error.value.args[0]) == 1
assert len(error.value.child_exceptions) == 1
assert capsys.readouterr().out == "Oh no ! It's a failure.\n"


Expand Down Expand Up @@ -289,4 +289,4 @@ def test_partial_sqs_processor_context_only_failure(sqs_event_factory, record_ha
with partial_processor(records, record_handler) as ctx:
ctx.process()

assert len(error.value.args[0]) == 2
assert len(error.value.child_exceptions) == 2
12 changes: 8 additions & 4 deletions tests/unit/test_utilities_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def test_partial_sqs_process_record_success(mocker, partial_sqs_processor):

result = partial_sqs_processor._process_record(record)

handler_mock.assert_called_once_with(record)
success_handler_mock.assert_called_once_with(record, success_result)
handler_mock.assert_called_once_with(record=record)
success_handler_mock.assert_called_once_with(record=record, result=success_result)

assert result == expected_value

Expand All @@ -98,9 +98,13 @@ def test_partial_sqs_process_record_failure(mocker, partial_sqs_processor):

result = partial_sqs_processor._process_record(record)

handler_mock.assert_called_once_with(record)
failure_handler_mock.assert_called_once_with(record, failure_result)
handler_mock.assert_called_once_with(record=record)

_, failure_handler_called_with_args = failure_handler_mock.call_args
failure_handler_mock.assert_called_once()
assert (failure_handler_called_with_args["record"]) == record
assert isinstance(failure_handler_called_with_args["exception"], tuple)
assert failure_handler_called_with_args["exception"][1] == failure_result
assert result == expected_value


Expand Down