Skip to content

Conversation

@jonavellecuerdo
Copy link
Contributor

@jonavellecuerdo jonavellecuerdo commented Jul 3, 2025

Purpose and background context

This work is an important part of ensuring that running DSC is idempotent via tracking the state of item submissions using a DynamoDB table. With 'finalize' being the last step
of the DSC workflow, it is during this step that the table is updated with information regarding the attempted deposit into DSpace and whether the item submission was ingested.

NOTE: The last two commits do somewhat significant cleanup:

  • 7a7aa47: The last commit proposes a renaming of methods on the Workflow class to emphasize relation to the finalize CLI command / DSC step.
  • 4ca339c: Without an example of Workflow.workflow_specific_processing, it's hard to picture what might be required parameters for this method. However, the idea behind this change was that it can pull information from either WorkflowEvents or the DynamoDB table to identify the item submissions the method may require.

How can a reviewer manually see the effects of these changes?

Recommend reviewing by commit!

For now the updated unit tests should be sufficient.

Includes new or updated dependencies?

NO

Changes expectations for external applications?

NO

What are the relevant tickets?

Developer

  • All new ENV is documented in README
  • All new ENV has been added to staging and production environments
  • All related Jira tickets are linked in commit message(s)
  • Stakeholder approval has been confirmed (or is not needed)

Code Reviewer(s)

  • The commit message is clear and follows our guidelines (not just this PR message)
  • There are appropriate tests covering any new functionality
  • The provided documentation is sufficient for understanding any new functionality introduced
  • Any manual tests have been performed or provided examples verified
  • New dependencies are appropriate or there were no changes

Why these changes are being introduced:
* This work is an important part of ensuring that running
DSC is idempotent via tracking the state of item submissions
using a DynamoDB table. With 'finalize' being the last step
of the DSC workflow, it is during this step that the table
is updated with information regarding the attempted deposit
into DSpace and whether the item submission was ingested.

How this addresses that need:
* Break up now-deprecated result message parsing method into two
methods for 'MessageAttributes' and 'Body' content
  - Each method raises an 'InvalidSQSMessageError' exception if invalid
* Rework logic in Workflow.process_sqs_queue to use new parsing methods
* Include calls to update records in DynamoDB table
* Update language in 'sqs_results_summary' count variable

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/IN-1318
@coveralls
Copy link

coveralls commented Jul 3, 2025

Pull Request Test Coverage Report for Build 16121218445

Details

  • 61 of 61 (100.0%) changed or added relevant lines in 4 files are covered.
  • 1 unchanged line in 1 file lost coverage.
  • Overall coverage increased (+0.3%) to 95.713%

Files with Coverage Reduction New Missed Lines %
dsc/config.py 1 94.52%
Totals Coverage Status
Change from base Build 16055691110: 0.3%
Covered Lines: 893
Relevant Lines: 933

💛 - Coveralls

@jonavellecuerdo jonavellecuerdo force-pushed the IN-1318-update-finalize-command-to-use-dynamodb branch 2 times, most recently from e6c05cc to 66c9a44 Compare July 3, 2025 18:56
@jonavellecuerdo jonavellecuerdo force-pushed the IN-1318-update-finalize-command-to-use-dynamodb branch from 66c9a44 to 7a7aa47 Compare July 3, 2025 19:07
Comment on lines 741 to 786
@final
@staticmethod
def _parse_result_message(message_attributes: dict, message_body: str) -> dict:
"""Parse content of result message.
This method will validate the content of the result message and return
a dict summarizing the outcome of the attempted submission via DSS:
def _parse_result_message_attrs(message_attributes: dict) -> dict:
"""Parse and validate content of 'MessageAttributes' in result message.
1. Verify that 'message_attributes' adheres to
dsc.utilities.validate.schemas.RESULT_MESSAGE_ATTRIBUTES JSON schema.
2. Verify that 'message_body' is a valid JSON string.
3. Verify that the parsed 'message_body' adheres to
dsc.utilities.validate.schemas.RESULT_MESSAGE_BODY JSON schema.
If the content passes schema validation, the content is returned.
Args:
message_attributes: Content of 'MessageAttributes' in result message.
message_body (str): Content of 'Body' in result message.
Returns:
dict: Result of attempted submission via DSS.
Raises:
InvalidSQSMessageError
"""
result_info: dict = {
"item_identifier": None,
"ingested": None,
"dspace_handle": None,
"error": None,
"result_message_body": message_body,
}

# validate content of 'MessageAttributes'
try:
jsonschema.validate(
instance=message_attributes,
schema=RESULT_MESSAGE_ATTRIBUTES,
)
except jsonschema.exceptions.ValidationError:
error_message = "Content of 'MessageAttributes' is invalid"
logger.exception(error_message)
result_info["error"] = error_message
return result_info
except jsonschema.exceptions.ValidationError as exception:
raise InvalidSQSMessageError(
"Content of 'MessageAttributes' failed schema validation"
) from exception
return message_attributes

@final
@staticmethod
def _parse_result_message_body(message_body: str) -> dict:
"""Parse and validate content of 'Body' in result message.
result_info["item_identifier"] = message_attributes["PackageID"]["StringValue"]
If the JSON string can be deserialized to a Python dictionary
and it passes schema validation, the parsed content is returned.
Raises:
InvalidSQSMessageError
"""
# validate content of 'Body'
try:
parsed_message_body = json.loads(message_body)
jsonschema.validate(instance=parsed_message_body, schema=RESULT_MESSAGE_BODY)
except json.JSONDecodeError:
error_message = "Failed to parse content of 'Body'"
logger.exception(error_message)
result_info["error"] = error_message
except jsonschema.exceptions.ValidationError:
error_message = "Content of 'Body' is invalid"
logger.exception(error_message)
result_info["error"] = error_message
else:
result_info["ingested"] = bool(parsed_message_body["ResultType"] == "success")
result_info["result_message_body"] = parsed_message_body
result_info["dspace_handle"] = parsed_message_body.get("ItemHandle")
return result_info

def workflow_specific_processing(self, items: list[dict]) -> None:
except json.JSONDecodeError as exception:
raise InvalidSQSMessageError(
"Failed to parse content of 'Body'"
) from exception
except jsonschema.exceptions.ValidationError as exception:
raise InvalidSQSMessageError(
"Content of 'Body' failed schema validation"
) from exception
return parsed_message_body
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking up the now-deprecated _parse_result_message method was a means to move away from the created result_info dict. This moves a good chunk of code into Workflow.process_result_messages, but I'd argue that it's more explicit about what is expected of the result message content.

@jonavellecuerdo jonavellecuerdo marked this pull request as ready for review July 3, 2025 19:23
@jonavellecuerdo jonavellecuerdo requested a review from a team as a code owner July 3, 2025 19:23
Copy link
Contributor

@ehanson8 ehanson8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great!!! A few comments and questions

)

items = []
sqs_processed_items_ids = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need sqs_ in var name but it's not that big of a deal

elif result_info["ingested"]:
logger.info(f"Item was ingested: {item_identifier}")
processing_summary["ingested"] += 1
logger.info("Unable to determine if ingest status for item.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary if?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think the if statement you're referencing is removed by a later commit. See final version of line in Files changed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary if in the logger.info message, apologies for not being clearer because if is a word that more often has the connotation you assumed 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH. 😅

Comment on lines +760 to +763
except json.JSONDecodeError as exception:
raise InvalidSQSMessageError(
"Failed to parse content of 'Body'"
) from exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this check be in _parse_result_message_attrs as well? It seems just as likely to raise a JSONDecodeError and it is run first so we'd get a quicker indication that the whole message might be garbage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the time MessageAttributes is passed into Workflow._parse_result_message_attrs, it is already a Python dictionary so there isn't a need to use the json.loads method.

"""Process the result messages from the DSS output queue according the workflow."""
workflow = ctx.obj["workflow"]
workflow.process_ingest_results()
workflow.finalize_items()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're on a roll with the renames! 🎉

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoutout to @ghukill for encouraging the rename 🤓 I think it's been something we've been touching on with the recent PRs and t'was time.

)

def process_result_messages(self) -> list[str]:
def process_result_messages(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I fully grasp why this is being changed. Originally, I thought DynamoDB would just be a part of Wiley workflow-specific processing but now that every workflow has it, the best example would be the DDC workflow where an ArchivesSpace URI will be included in the metadata but not put into the DSpace metadata. It would carry over though (I'm just now thinking maybe through DynamoDB?) so that a report can be produced of ArchivesSpace URIs and their corresponding DSpace handles after finalize is run. This report would be used in a separate process from DSC to update ArchivesSpace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the reason for removing was that we can retrieve all the information for the items in a batch by querying the DynamoDB table 🤔

Since Workflow.process_result_messages is focused on consuming the messages from the output queue, it would technically only retrieve the items that are associated with those result messages.

Re: how we can store the ArchivesSpace URI, since DynamoDB tables are "schemaless", it would be very easy to add a column to hold ArchivesSpace URIs.

Can you add some of this information to https://mitlibraries.atlassian.net/browse/IN-1100?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, and will do on updating the Jira ticket!

Copy link

@ghukill ghukill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to go out on a limb and approve straight-away!

First, agree with all of @ehanson8's comments about renames being good.

Second, I'm going into these PRs right now with the assumption there might be a refactor pass in the near future. I'm thrilled to see the reconcile + submit + finalize functionality getting established with Dynamo backing it all, and I think some reps of actually using it will reveal things that work well and possible bugs.

In the spirit of refactors, I left a comment about possible SQS message specific classes that could take a lot of the complexity and burden out of the Workflow class. I think, at a glance, it may even address some of the other comments in this PR about message validation and parsing.

All in all, looking good, and full support of moving forward towards end-to-end functionality for testing. With the request there is some time allotted for a little big picture refactoring once all these foundational pieces are established.

self.workflow_specific_processing()

# update WorkflowEvents with batch-level ingest results
for item_submission_record in ItemSubmissionDB.query(self.batch_id):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this with fresh eyes today, my immediate knee-jerk was that a method like on ItemSubmissionDB like get_batch_items(batch_id: str) could be a nice convenience method.

I don't know if we would use it anywhere else, but feels like a method that DB model could/would have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if I define a method like so:

@classmethod
def get_batch_items(cls, batch_id: str): 
   return cls.query(batch_id)

It would be used like

for item_submission_record in ItemSubmissionDB.get_batch_items(self.batch_id):
 ...

Will it make much of a difference? 🤔

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it abstracts away -- again from the high level POV of the Workflow -- what it a takes to only get rows from the Dynamo table that share a batch_id.

For example, just a .query() method with a single passed string to it makes me kind of nervous. Is that fulltext searching all columns? or are pynamodb models opinionated such that .query() is only searching a specific column?

What happens if the batch is empty? Should that raise an exception? Related, is there value in such a proposed method first retrieving all of the results and then returning that? That would support things like logging how many it found before you even started to process.

Not at all required, my intial and this approval still stands, but just observations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for expanding further! Will continue to think about this as part of future refactor.

Comment on lines 640 to +661
message_id = sqs_message["MessageId"]
message_body = sqs_message["Body"]
receipt_handle = sqs_message["ReceiptHandle"]

logger.debug(f"Processing result message: {message_id}")

result_info = self._parse_result_message(
sqs_message["MessageAttributes"], message_body=sqs_message["Body"]
try:
message_attributes = self._parse_result_message_attrs(
sqs_message["MessageAttributes"]
)
except InvalidSQSMessageError as exception:
logger.error( # noqa: TRY400
f"Failed to parse 'MessageAttributes' from {message_id}: {exception}"
)
sqs_results_summary["ingest_unknown"] += 1

# delete message from the queue
sqs_client.delete(
receipt_handle=receipt_handle,
message_id=message_id,
)
continue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm electing to locate this comment here, but I have a feeling it may apply elsewhere and/or be an option for future refactor work.

As I look at this method and code block with fresh eyes today, while also having ItemSubmissionDB fresh in my brain from a previous comment, I think there could be an opportunity for classes like:

class SQSMessage:
    pass

class SubmitSQSMessage(SQSMessage):
    pass

class ResultSQSMessage(SQSMessage):
    pass

Unsure at the moment what the base SQSMessage class might have that is shared between the other two, or if it's needed at all. But it feels like these methods could handle just about all of this loading, validating, and parsing of attributes from the message payload.

Perhaps one attribute they all share would be an instantiated instance of SQSClient that they could use to even to delete themselves for example.

So much of these code blocks are dictionary key:value accessing, validating when not present, etc., and it kind of confuses the method itself which is about processing those messages.

I feel like at one point the act of "processing" these messages was just making sure they were valid and then deleting them, but we've started to layer on more like updating in Dynamo and workflow events.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: as I look at the base Workflow class, unless I'm missing something, these claseses may gobble up other methods like:

  • _parse_result_message_attrs
  • _parse_result_message_body

This would remove a lot of the data parsing from the Workflow class and let it focus more on the "big picture" of a workflow by orchestrating these SQS, Dynamo, and reporting things but not concerned with their nitty-gritty implementation.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lastly: those classes may have poor names. And, any work on this front should probably be in the context of a larger touch. We don't want to overcorrect and have Workflow calling a million classes that have leaky abstractions with each other.

But I think the comment stands: it feels like the base Workflow class could benefit from farming out some of this data parsing + validating + structuring to a more data-focused class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ghukill Thank you for the ideas! Do you feel there is enough information to create a new ticket? If so, can you add one to the epic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ghukill's suggestions sound good to explore!

Copy link
Contributor

@ehanson8 ehanson8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work!

@jonavellecuerdo jonavellecuerdo merged commit ca7cd0d into support-for-dynamodb Jul 7, 2025
2 checks passed
@jonavellecuerdo jonavellecuerdo deleted the IN-1318-update-finalize-command-to-use-dynamodb branch July 7, 2025 16:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants