-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[task scheduling] scope TaskRunner
to TaskServer
#11806
Conversation
…-server-task-runner
The goal here is for each subscriber to be able to filter to a subset of the tasks they are interested in (i.e. which tasks a task server is serving). This required routing task runs into queues and monitoring multiple queues for tasks. This was a good opportunity to refactor some of the mechanics here to capture the idea that a `TaskQueue` is really two queues (a "regular" queue and a high-priority retry queue). This led me down a path of tidying up the implementation to remove globals and encapsulate more of the behavior.
✅ Deploy Preview for prefect-docs-preview ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
…-server-task-runner
…Q/prefect into task-server-task-runner
…ectHQ/prefect into task-server-task-runner
…Q/prefect into task-server-task-runner
create_autonomous_task_run | ||
) | ||
else: | ||
return from_sync.wait_for_call_in_loop_thread( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this part handles the sync within async correctly where sync_compatible
does not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noice!
src/prefect/context.py
Outdated
@@ -234,9 +233,9 @@ class EngineContext(RunContext): | |||
flow: Optional["Flow"] = None | |||
flow_run: Optional[FlowRun] = None | |||
autonomous_task_run: Optional[TaskRun] = None | |||
task_runner: BaseTaskRunner | |||
task_runner: Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really Any
or is it Optional[BaseTaskRunner]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was slightly over-complicating this in the case where we want to re-use a task runner, we can just pass the task runner instance along instead of entering a null context and then the typing stays the same
updated in 1a5a047
src/prefect/task_server.py
Outdated
- tags: A list of tags to apply to the task server. Defaults to `["autonomous"]`. | ||
- task_runner: The task runner to use for executing the tasks. Defaults to | ||
`ConcurrentTaskRunner`. | ||
- tags: A list of tags to add to task runs submitted by the task server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we still doing the tags thing? The TaskServer doesn't submit tasks anymore, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh i see what you mean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm'd in 9eab2be
this PR:
task_runner
viaserve
(to be used for all task runs picked up by thatTaskServer
)TaskServer
context management, in particular to make sure we only start / stop aTaskRunner
with the lifetime of theTaskServer
(i.e. not once per task submission)stacked on top of #11805, so will not merge until that is