Conversation
@@ -235,9 +250,9 @@ def get_should_continue(self, response_json) -> bool: | |||
" been fetched. Consider reducing the ingestion interval." | |||
) | |||
if self.should_raise_error: | |||
raise AirflowException(error_message) | |||
raise Exception(error_message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's necessary to change this to an Exception
, because AirflowException
s are specifically not handled by our ingestion error handling logic (meaning they can never be skipped).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stacimc Is this feedback worth adding to the code?
# Must be an `Exception` instead of an `AirflowException` to allow skipping.
Or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or potentially even a custom, Exception
-derived exception subclass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our own error handling in the super class will re-raise this as an IngestionError
already.
This isn't a problem with this specific exception so much as a general point about error handling in the ingester classes: we probably shouldn't be raising AirflowException
s ourselves anyway.
Based on the high urgency of this PR, the following reviewers are being gently reminded to review this PR: @AetherUnbound Excluding weekend1 days, this PR was updated 2 day(s) ago. PRs labelled with high urgency are expected to be reviewed within 2 weekday(s)2. @stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings. Footnotes |
logger.error( | ||
f"{detected_count} records retrieved, but there is a" | ||
f" limit of {self.max_unique_records}." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log was so useful when observing this PR in airflow. Before even reading the code it gave a great understanding of what was happening 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really cool. On my local run I picked up 46,732 records. As always, phenomenal documentation 😍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is such a clever idea! Hope we can get most of the images now.
Thank you for amazing documentation and tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incredible documentation, thanks for getting this all down in the various docstrings/comments! I hope this will be useful for other folks who encounter issues with the Flickr API 😄
@@ -235,9 +250,9 @@ def get_should_continue(self, response_json) -> bool: | |||
" been fetched. Consider reducing the ingestion interval." | |||
) | |||
if self.should_raise_error: | |||
raise AirflowException(error_message) | |||
raise Exception(error_message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or potentially even a custom, Exception
-derived exception subclass?
Fixes
Fast follow for Flickr backfill.
Related to WordPress/openverse#1285.
Description
This PR attempts to handle situations where the Flickr API returns excessively large batches of data. The logic should be documented pretty thoroughly in the code, but here's a refresher:
The Flickr API will only return 4,000 unique records for any given set of query params; after that, it will just infinitely return duplicates. Consequently, we have to try to query the API in such a way that we get batches with less than 4,000 records each. Up until now, we have been doing this using the
TimeDelineatedProviderDataIngester
to break the day into small time intervals for ingestion.There is a limit to the granularity of data by time interval, though -- once you reduce the interval size to about 5 minutes, the number of records stays the same. For example, given a 5-min interval with > 4k records, if you search any 5 second interval within this range, you will still be returned >4k records. So reducing the timespan only works up to a certain point. However, we can still try to reduce the size of the result set by querying for one license type at a time, instead of all 8 license types at once.
This PR detects these large batch intervals during ingestion, adds them to an array for later processing, and skips ingestion for that batch. After 'regular' ingestion completes, each of these large intervals are reprocessed 8 times, once for each license type. It's still possible for a 5-min interval to contain more than 4k records for a single license type, but in this case there's nothing more we can do, so we process the first 4,000 results and then continue.
Notes
Testing Instructions
Try running the flickr DAG locally. In particular, run it for one of the days that failed in production using the DagRun conf options.
I tried a manual run with the conf:
This day failed in production after ingesting 7,253 records. Tested locally against this branch, the run succeeded after about 10 minutes and ingested 56,927 records.
Checklist
Update index.md
).main
) ora parent feature branch.
errors.
Developer Certificate of Origin
Developer Certificate of Origin