-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Local Retries + OneTimeSchedule #680
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 love the functionality this adds, have a few comments about simplifying the implementation just to reduce complexity.
src/prefect/core/flow.py
Outdated
|
||
while True: # run indefinitely | ||
end = self.schedule.next(1)[0] | ||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like there's some complexity in this and the subsequent white/sleep
blocks -- why not just sleep until the next schedule time, or the earliest task time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've read things that suggest time.sleep
isn't very accurate and can easily end too soon / too late depending on other system activity, so this implementation let me not worry too much about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to implement this as a utility long_sleep
or something if you'd prefer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. I'm tempted to say that in the name of simplicity: just time.sleep()
until the appointed time, and repeat the sleep if we wake up too early or just go on if we wake up too late. I think guarantees are generally much looser in this method since it's for primarily for local testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know, an alternate implementation could begin the Flow with a Scheduled state, and our hooks would prevent it from running prior to the set start_time. That might simplify things quite a bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good feedback --> I updated the implementation; should be much simpler to reason about now.
src/prefect/core/flow.py
Outdated
|
||
kwargs["return_tasks"] = self.tasks | ||
|
||
while True: # run indefinitely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like the break condition for this while loop is the same as that of the while loop on 876. I think you might be able to remove one loop entirely by creating the flow_state
before entering the while loop, and then just saying while not flow_state.is_finished()
.
That plus the adjustment in my previous comment would go from (3 while loops that depend on a break
, and one that has a terminal condition) to just (a single while loop with a terminal condition).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the break for the loop on 876 is that a single flow run has complete; the break condition for this while loop is that there are no more flow runs left to schedule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha -- I didn't realize. Would you mind just expanding the comment a little bit? I thought "run indefinitely" implicitly mean "...until the break below stops it".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated implementation, with a few comments. Should be much simpler now.
src/prefect/schedules.py
Outdated
raise TypeError("`start_date` must be a datetime.") | ||
super().__init__(start_date=start_date, end_date=start_date) | ||
|
||
def next(self, n: int, after: datetime = None) -> List[datetime]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the use case for a one-time schedule! However I think you could get to the same place with less code by subclassing IntervalSchedule
and setting end_date = start_date
in your init, with any interval (say, one day). You wouldn't need a next
method since the IntervalSchedule
logic would do the right thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah interesting, I'll give that a shot!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
tests/test_schedules.py
Outdated
s = schedules.OneTimeSchedule(start_date=start_date) | ||
assert s.next(0) == [] | ||
|
||
def test_onetime_schedule_n_equals_0(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small thing -- this test has the same name as the previous test, so it shadows it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Cool! LGTM -- I don't have a strong opinion on the while loop it just "feels" complicated and potentially an issue (also for some reason the potential - though impossible reality - of moving closer to the target by 50% and never reaching it is scary) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! Thanks for simplifying -- I think it will save future headache without compromising functionality.
Collect task run inputs to subflows
Thanks for contributing to Prefect!
Please describe your work and make sure your PR:
CHANGELOG.md
(if appropriate)docs/outline.toml
for API reference docs (if appropriate)What does this PR change?
This PR adds two new features (which are related in my mind):
flow.run(on_schedule=True)
OneTimeSchedule
for one-off, scheduled executionI imagine the
OneTimeSchedule
to mainly be used during local hacking, for testing retries, etc. (sinceon_schedule=True
requires the Flow to actually have a schedule)Why is this PR important?
The ability for tasks to Retry in local execution is important for open source users, as well as Cloud users who are hacking locally on their Flows and want to test Retries. This doesn't bring Core into feature parity with Core + Cloud, but it does make Core immensely more useful as a standalone tool, which is still important.