Skip to content

Commit

Permalink
Merge pull request #20 from MITLibraries/etd-445-confirm-result-message
Browse files Browse the repository at this point in the history
Verify message sent to result queue
  • Loading branch information
hakbailey committed Dec 6, 2021
2 parents 9909bce + 4ec1b85 commit e8fd716
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 11 deletions.
30 changes: 30 additions & 0 deletions submitter/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,36 @@ def __init__(
)


class SQSMessageSendError(Exception):
"""Exception raised when a message sent to an SQS result queue cannot be verified.
Args:
message_attributes: The attributes of the message that was not successfully sent
message_body: The body of the message that was not succesfully sent
result_queue: The name of the result queue that the message was sent to
submit_message_id: The SQS ID of the corresponding submit message
Attributes:
message(str): Explanation of the error
"""

def __init__(
self,
message_attributes: dict,
message_body: dict,
result_queue: str,
submit_message_id: str,
):
self.message = (
f"Message was not successfully sent to result queue '{result_queue}', "
"aborting DSpace Submission Service processing until this can be "
"investigated. NOTE: The submit message is likely still in the submission "
"queue and may need to be manually deleted before processing "
f"resumes. Submit message ID: {submit_message_id}. Result message "
f"attributes: {message_attributes}. Result message body: {message_body}"
)


class SubmitMessageInvalidResultQueueError(Exception):
"""Exception raised due to an invalid result queue name in a submission message.
Expand Down
36 changes: 26 additions & 10 deletions submitter/sqs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import hashlib
import json
import logging

import boto3
from dspace.client import DSpaceClient

from submitter import CONFIG
from submitter import CONFIG, errors
from submitter.submission import Submission

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -48,13 +49,23 @@ def process(msgs):
submission = Submission.from_message(message)
if not submission.result_message:
submission.submit(client)
write_message_to_queue(
response = write_message_to_queue(
submission.result_attributes,
submission.result_message,
submission.result_queue,
)
# TODO: probs better to confirm the write to the output was good
# before cleanup but for now yolo it
if not verify_sent_message(submission.result_message, response):
raise errors.SQSMessageSendError(
submission.result_attributes,
submission.result_message,
submission.result_queue,
response["MessageId"],
)
logger.debug(
"Wrote message to queue '%s' with message body: %s",
submission.result_queue,
json.dumps(submission.result_message),
)
message.delete()
logger.info("Deleted message '%s' from input queue", message_id)

Expand All @@ -79,18 +90,23 @@ def retrieve_messages_from_queue(input_queue, wait, visibility=30):
def write_message_to_queue(attributes: dict, body: dict, output_queue: str):
sqs = sqs_client()
queue = sqs.get_queue_by_name(QueueName=output_queue)
queue.send_message(
response = queue.send_message(
MessageAttributes=attributes,
MessageBody=json.dumps(body),
)
logger.debug(
"Wrote message to queue '%s' with message body: %s",
output_queue,
json.dumps(body, indent=2),
)
return response


def create(name):
sqs = sqs_client()
queue = sqs.create_queue(QueueName=name)
return queue


def verify_sent_message(
sent_message_body: dict, sqs_send_message_response: dict
) -> bool:
body_md5 = hashlib.md5( # nosec
json.dumps(sent_message_body).encode("utf-8")
).hexdigest()
return body_md5 == sqs_send_message_response["MD5OfMessageBody"]
26 changes: 25 additions & 1 deletion tests/test_sqs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

import pytest
from botocore.exceptions import ClientError

Expand All @@ -7,6 +9,7 @@
process,
retrieve_messages_from_queue,
sqs_client,
verify_sent_message,
write_message_to_queue,
)

Expand Down Expand Up @@ -48,7 +51,8 @@ def test_write_message_to_queue(mocked_sqs, raw_attributes, raw_body):
assert len(msgs) == 0

# write to queue
write_message_to_queue(raw_attributes, raw_body, "empty_result_queue")
response = write_message_to_queue(raw_attributes, raw_body, "empty_result_queue")
assert "MD5OfMessageBody" in response

# confirm queue has a message
msgs = retrieve_messages_from_queue("empty_result_queue", 0)
Expand Down Expand Up @@ -106,3 +110,23 @@ def test_message_loop(mocked_sqs, mocked_dspace):
# confirm output queue is populated
output_msgs = retrieve_messages_from_queue("empty_result_queue", 0)
assert len(output_msgs) == 10


def test_verify_returns_true(mocked_sqs, raw_attributes, raw_body):
sqs = sqs_client()
queue = sqs.get_queue_by_name(QueueName="empty_result_queue")
response = queue.send_message(
MessageAttributes=raw_attributes,
MessageBody=json.dumps(raw_body),
)
assert verify_sent_message(raw_body, response) is True


def test_verify_returns_false(mocked_sqs, raw_attributes, raw_body):
sqs = sqs_client()
queue = sqs.get_queue_by_name(QueueName="empty_result_queue")
response = queue.send_message(
MessageAttributes=raw_attributes,
MessageBody=json.dumps(raw_body),
)
assert verify_sent_message({"body": "a different message body"}, response) is False

0 comments on commit e8fd716

Please sign in to comment.