From 4ec1b85e980117ecad2506163717da996e9bdcce Mon Sep 17 00:00:00 2001 From: Helen Bailey Date: Mon, 29 Nov 2021 09:18:22 -0500 Subject: [PATCH] Verify message sent to result queue Why these changes are being introduced: We want to confirm that a message has been successfully sent to a result queue before deleting its original submission message from the submit queue. How this addresses that need: * Adds a function to verify the message was successfully written to a result queue, with corresponding test * Calls the verify message during processing and raises an error if the result message can't be verified Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/ETD-445 More --- submitter/errors.py | 30 ++++++++++++++++++++++++++++++ submitter/sqs.py | 36 ++++++++++++++++++++++++++---------- tests/test_sqs.py | 26 +++++++++++++++++++++++++- 3 files changed, 81 insertions(+), 11 deletions(-) diff --git a/submitter/errors.py b/submitter/errors.py index 5ef56fa..7539584 100644 --- a/submitter/errors.py +++ b/submitter/errors.py @@ -131,6 +131,36 @@ def __init__( ) +class SQSMessageSendError(Exception): + """Exception raised when a message sent to an SQS result queue cannot be verified. + + Args: + message_attributes: The attributes of the message that was not successfully sent + message_body: The body of the message that was not succesfully sent + result_queue: The name of the result queue that the message was sent to + submit_message_id: The SQS ID of the corresponding submit message + + Attributes: + message(str): Explanation of the error + """ + + def __init__( + self, + message_attributes: dict, + message_body: dict, + result_queue: str, + submit_message_id: str, + ): + self.message = ( + f"Message was not successfully sent to result queue '{result_queue}', " + "aborting DSpace Submission Service processing until this can be " + "investigated. NOTE: The submit message is likely still in the submission " + "queue and may need to be manually deleted before processing " + f"resumes. Submit message ID: {submit_message_id}. Result message " + f"attributes: {message_attributes}. Result message body: {message_body}" + ) + + class SubmitMessageInvalidResultQueueError(Exception): """Exception raised due to an invalid result queue name in a submission message. diff --git a/submitter/sqs.py b/submitter/sqs.py index 67dbbc2..91bd6e3 100644 --- a/submitter/sqs.py +++ b/submitter/sqs.py @@ -1,10 +1,11 @@ +import hashlib import json import logging import boto3 from dspace.client import DSpaceClient -from submitter import CONFIG +from submitter import CONFIG, errors from submitter.submission import Submission logger = logging.getLogger(__name__) @@ -48,13 +49,23 @@ def process(msgs): submission = Submission.from_message(message) if not submission.result_message: submission.submit(client) - write_message_to_queue( + response = write_message_to_queue( submission.result_attributes, submission.result_message, submission.result_queue, ) - # TODO: probs better to confirm the write to the output was good - # before cleanup but for now yolo it + if not verify_sent_message(submission.result_message, response): + raise errors.SQSMessageSendError( + submission.result_attributes, + submission.result_message, + submission.result_queue, + response["MessageId"], + ) + logger.debug( + "Wrote message to queue '%s' with message body: %s", + submission.result_queue, + json.dumps(submission.result_message), + ) message.delete() logger.info("Deleted message '%s' from input queue", message_id) @@ -79,18 +90,23 @@ def retrieve_messages_from_queue(input_queue, wait, visibility=30): def write_message_to_queue(attributes: dict, body: dict, output_queue: str): sqs = sqs_client() queue = sqs.get_queue_by_name(QueueName=output_queue) - queue.send_message( + response = queue.send_message( MessageAttributes=attributes, MessageBody=json.dumps(body), ) - logger.debug( - "Wrote message to queue '%s' with message body: %s", - output_queue, - json.dumps(body, indent=2), - ) + return response def create(name): sqs = sqs_client() queue = sqs.create_queue(QueueName=name) return queue + + +def verify_sent_message( + sent_message_body: dict, sqs_send_message_response: dict +) -> bool: + body_md5 = hashlib.md5( # nosec + json.dumps(sent_message_body).encode("utf-8") + ).hexdigest() + return body_md5 == sqs_send_message_response["MD5OfMessageBody"] diff --git a/tests/test_sqs.py b/tests/test_sqs.py index 355f035..3906f2a 100644 --- a/tests/test_sqs.py +++ b/tests/test_sqs.py @@ -1,3 +1,5 @@ +import json + import pytest from botocore.exceptions import ClientError @@ -7,6 +9,7 @@ process, retrieve_messages_from_queue, sqs_client, + verify_sent_message, write_message_to_queue, ) @@ -48,7 +51,8 @@ def test_write_message_to_queue(mocked_sqs, raw_attributes, raw_body): assert len(msgs) == 0 # write to queue - write_message_to_queue(raw_attributes, raw_body, "empty_result_queue") + response = write_message_to_queue(raw_attributes, raw_body, "empty_result_queue") + assert "MD5OfMessageBody" in response # confirm queue has a message msgs = retrieve_messages_from_queue("empty_result_queue", 0) @@ -106,3 +110,23 @@ def test_message_loop(mocked_sqs, mocked_dspace): # confirm output queue is populated output_msgs = retrieve_messages_from_queue("empty_result_queue", 0) assert len(output_msgs) == 10 + + +def test_verify_returns_true(mocked_sqs, raw_attributes, raw_body): + sqs = sqs_client() + queue = sqs.get_queue_by_name(QueueName="empty_result_queue") + response = queue.send_message( + MessageAttributes=raw_attributes, + MessageBody=json.dumps(raw_body), + ) + assert verify_sent_message(raw_body, response) is True + + +def test_verify_returns_false(mocked_sqs, raw_attributes, raw_body): + sqs = sqs_client() + queue = sqs.get_queue_by_name(QueueName="empty_result_queue") + response = queue.send_message( + MessageAttributes=raw_attributes, + MessageBody=json.dumps(raw_body), + ) + assert verify_sent_message({"body": "a different message body"}, response) is False