-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gdt 275 sqs missing file #286
Conversation
Why these changes are being introduced: * Adding exception handling for SQS messages that refer to non-existent files in the S3 bucket so that they are skipped and subsequent SQS messages are processed. How this addresses that need: * Add try/except block to handle the OSError that results from a missing file in S3 as well as a corresponding unit test * Update and add fixtures to support new unit test Side effects of this change: * SQS messages without a corresponding file in the S3 bucket will remain in the queue Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/GDT-275
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall - the location of the try/except logic, the test fixtures, all look really good.
My reason for requesting changes -- whether or not adopted -- is to consider yielding a Record + exception from the incremental_harvest_get_source_records()
method instead of logging and skipping the record entirely. As discussed in the comment, I think we might benefit from utilizing the error handling built-in to the pipeline where possible.
Counterpoint is that other harvesters generally don't have this level of sophistication for error handling for source records. Counter-counterpoint, perhaps they should?
Open to anything here, just wanted to propose a good faith example of how yielding Records + exception will get handled in the pre-existing error handling in the pipeline.
def test_mit_harvester_incremental_continues_after_missing_zip_file( | ||
caplog, | ||
mock_sqs_queue, | ||
mocked_sqs_topic_name, | ||
mocked_restricted_bucket_one_legacy_fgdc_zip, | ||
): | ||
harvester = MITHarvester( | ||
harvest_type="incremental", | ||
input_files=mocked_restricted_bucket_one_legacy_fgdc_zip, | ||
sqs_topic_name=mocked_sqs_topic_name, | ||
) | ||
records = harvester.incremental_harvest_get_source_records() | ||
assert len(list(records)) == 1 | ||
assert ( | ||
"OSError: unable to access bucket: 'mocked_cdn_restricted' " | ||
"key: 'cdn/geo/restricted/DEF456.zip'" in caplog.text | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this test, I'll admit I had to do some digging to understand why it worked. What's not immediately obvious to me is that mocked queue contains two messages: one referencing a file that is in the mocked S3 bucket, and one that is not.
Instead of a docstring, what about updating the fixture mock_sqs_queue
to something like mock_sqs_queue_two_messages
? I feel like the "two" in that fixture and "one" from mocked_restricted_bucket_one_legacy_fgdc_zip
would be enough to understand that final assert = 1
for the length of the results --> one failed, but one still succeeded.
Not a blocking request, but just sharing my experience unpacking this test (which is good, BTW).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: see other comment about yield Record + exception from the incremental_harvest_get_source_records()
method. If that approach is used, this test would need a couple of updates and maybe then the fixture name is more inconsequential.
harvester/harvest/mit.py
Outdated
try: | ||
source_record = self.create_source_record_from_zip_file( | ||
identifier=identifier, | ||
zip_file=zip_file_event_message.zip_file, | ||
event=zip_file_event_message.event, | ||
sqs_message=zip_file_event_message, | ||
), | ||
) | ||
except OSError: | ||
logger.exception("File not found") | ||
continue | ||
yield Record( | ||
identifier=identifier, | ||
source_record=source_record, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My knee-jerk reaction was that this is a simple and elegant solution to this issue. And I think that's true, but I think it also short-circuits failed records handling aspects of the pipeline.
Notice here in the base Harvester.harvest
method, where Records that encountered an error are filtered out (with the same pattern applied to each step):
records = self.filter_failed_records(self.get_source_records())
As such, if we fail to find and parse a source record (e.g. the file is not found in S3) we could yield a Record
instance with Record.exception_stage="incremental_harvest_get_source_records"
and Record.exception = <EXCEPTION OBJECT>
, we'd get the same error handling and reporting as other parts of the pipeline where something fails.
To do so, we could modify the try/except block here to still yield a Record
, just one without a SourceRecord
attached, and the exception encountered attached:
try:
source_record = self.create_source_record_from_zip_file(
identifier=identifier,
zip_file=zip_file_event_message.zip_file,
event=zip_file_event_message.event,
sqs_message=zip_file_event_message,
)
yield Record(
identifier=identifier,
source_record=source_record,
)
except OSError as exc: # <------- new logic starts here
yield Record(
identifier=identifier,
source_record=None, # <-------- Note this is None
exception_stage="incremental_harvest_get_source_records",
exception=exc,
)
This has a couple of effects:
- the total number of records yielded from this method would actually be two (one success, one fail)
- the test
test_mit_harvester_incremental_continues_after_missing_zip_file
could then interrogate the records and assert that one has an exception (and therefore won't carry on) and one does not (will continue as normal).
Example for updates we could apply to that test:
records = list(records)
assert len(records) == 2
fail_record, success_record = records
assert fail_record.identifier == "DEF456"
assert fail_record.exception_stage == "incremental_harvest_get_source_records"
assert isinstance(fail_record.exception, OSError)
assert (
str(fail_record.exception)
== "unable to access bucket: 'mocked_cdn_restricted' key: "
"'cdn/geo/restricted/DEF456.zip' version: None error: An error occurred ("
"NoSuchKey) when calling the GetObject operation: The specified key does not "
"exist."
)
assert success_record.identifier == "SDE_DATA_AE_A8GNS_2003"
assert success_record.exception is None
If we lean into the failed records handling by yield a Record + exception, then we'll get the same Sentry reporting for the Record's identifier and why (to the degree that is setup and configured).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A much better approach, updating! Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in the PR level comment, inside of a specific harvester it's not obvious this failure handling is happening at the orchestration level. And, while other low level methods may utilize it, I don't think any of the get_source_records()
methods (full and incremental) do. So this is kind of the first for them to utilize it.
* Refactor incremental_harvest_get_source_records method to populate Record objects with exceptions rather than skipping entirely * Update corresponding unit test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! Thanks for considering the failed records handling approach.
Purpose and background context
Adding exception handling for SQS messages that refer to non-existent files in the S3 bucket so that they are skipped and subsequent SQS messages are processed.
How can a reviewer manually see the effects of these changes?
foo.zip
was uploaded togeo-upload-dev-222053980223
which triggered an SQS message in thegeo-harvester-input-dev.fifo
queue and copied the file tocdn-origin-dev-222053980223/cdn/geo/restricted/
. The file was then deleted fromcdn-origin-dev-222053980223/cdn/geo/restricted/
to trigger the error.Set
Dev1
credentials and the following variables in.env
:Run a incremental harvest to see the error and see that the message remains in the
geo-harvester-input-dev.fifo
queueIncludes new or updated dependencies?
YES
Changes expectations for external applications?
NO
What are the relevant tickets?
Developer
Code Reviewer(s)