-
Notifications
You must be signed in to change notification settings - Fork 123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix incremental hitting end_value throwing out whole batches #495
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
@@ -236,7 +240,8 @@ def transform(self, row: TDataItem) -> bool: | |||
if self.end_value is not None and ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what I would do instead of this PR:
# Check whether end_value has been reached
if self.end_value is not None and (
self.last_value_func((row_value, self.end_value)) != self.end_value
):
return False
so we just return False when we are out of range. we do the same with the start_value - we do not close the gen.
why
- we do not assume that list is ordered
- if someone wants to exit earlier s/he can request data properly from the endpoint
- I removed
row_value == self.end_value
because we should not compare row_value with end/start values. that should happen only after processing bylast_value_func
. as a result we process data inclusively. which IMO is OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can add some special exception to close pipe outside form the generator but when we have a first real case :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yes, maybe the "stop generator" thing could be better thought out, and should st least be opt-in for sources we know are ordered.
It makes a lot of assumptions the way it is now, also assumes that you have no steps that need to execute after yielding the records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it for endpoints that don't have any "end value" filter. A "start" param and ascending order is pretty common, but there's not always an end param.
So this "stop" logic is code that doesn't need to be added to sources specifically, you can just throw this into most existing incremental sources.
What if we have something like an incremental.end_value_reached
property?
Resources that know better can ignore it, but ones that are ordered can check and make use of it as a "stop" signal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm interesting. this way we could help the resources that have ordered data. I actually have this case and it is github which is incremental decreasing so I could use some kind of incremental.is_out_of_range
.
for page in _get_rest_pages(access_token, repos_path + "?per_page=100"):
yield page
# stop requesting pages if the last element was already older than initial value
# note: incremental will skip those items anyway, we just do not want to use the api limits
if page and page[-1]["created_at"] < last_created_at.initial_value:
# do not get more pages, we overlap with previous run
print(
f"Overlap with previous run created at {last_created_at.initial_value}"
)
break
still I'm not sure that is is really worth it:
- case with
min
function andstart_value
- case with
max
function andend_value
- any other function - we do not know what kind of order it represents so probably we cant set it
we possibly are adding a lot of code that will be executed for each single item in the iterator, not sure we gain so much to really bother
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, similar to what I did with pipedrive leads as well. Fetch in descending order and check every page, https://github.com/dlt-hub/verified-sources/blob/master/sources/pipedrive/__init__.py#L173-L199
Imo best would be to add both start/end_out_of_range
flags and set them anyway on the first out of range item (the places we return False
from the filter). The only extra cost is one self.something = True
assignment. Wouldn't add any other checks for which last value func, etc.
Just document what they mean and that they're invalid for unordered results.
But for now, should we put a pin in this? Maybe best to revisit after we implement a few more use cases with end value?
(will remove the stop generator stuff too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK! please set this flag whenever an item was filtered out due to being out of range so it works both for end and start value (I can use it for github then) - I hope this makes sense. and drop the StopIteration. please document it in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added both flags and wrote a section in the docs too. Hope it's not too confusing.
I kept last_value_func((row_value, )) == self.end_value
check as well, so the range is exclusive at the end. Imo that makes this lot more convenient for chunked loading, so you can chain start, end
ranges: (a, b), (b, c), (c, d), ...
with no overlap
Raise a custom StopGenerator exception after filter so whole batch isn't thrown oout. StopIteration turns into a RuntimeError when raised from generator so use custom exception instead.
4f8b48c
to
cca6d6b
Compare
4e6d38a
to
eb15918
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I hope our Incremental class is (almost) complete now - it is becoming really complex :)
@steinitzu I will do a pre-release of dlt when this is merged so we can update the zenpy source |
…#495) * Fix incremental hitting end_value throwing out whole batches Raise a custom StopGenerator exception after filter so whole batch isn't thrown oout. StopIteration turns into a RuntimeError when raised from generator so use custom exception instead. * Test with 2 runs * Remove "stop generator" exception, add start/end_out_of_range flags * Document start/end_of_range usage and add + backloading info * Test out_of_range flags * Typo * Range inclusive at start, more tests
Raise a custom StopGenerator exception after filter so whole batch isn't thrown out.
StopIteration turns into a RuntimeError when raised from generator so use custom exception instead.