New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add capacity limiter to ConcurrentTaskRunner #7013
Conversation
✅ Deploy Preview for prefect-orion ready!Built without sensitive environment variables
To edit notification comments on pull requests, go to your Netlify site settings. |
src/prefect/task_runners.py
Outdated
if self._max_workers is not None: | ||
self._limiter = CapacityLimiter(self._max_workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we set this to an asyncnullcontext
when _max_workers
is null so that the code in _run_and_store_result
doesn't need a branch?
src/prefect/task_runners.py
Outdated
import anyio | ||
from anyio import CapacityLimiter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use anyio.CapacityLimiter
instead of importing the name? It's nice to have the clarity of where the implementation is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contributing! I've been meaning to add something like this. A couple comments on the implementation. We'll also need some sort of test before we can accept it.
Looking at test_task_runners, I'm not sure the best way to test this. Do we just need an additional test suite that yields (for example) |
That'd be a good start! It seems quite challenging to "prove" that it's being limited to some number of workers. Maybe would be useful to look at the AnyIO capacity limiter tests and see how they check it. |
# Runtime attributes | ||
self._task_group: anyio.abc.TaskGroup = None | ||
self._result_events: Dict[UUID, anyio.abc.Event] = {} | ||
self._results: Dict[UUID, Any] = {} | ||
self._keys: Set[UUID] = set() | ||
self._max_workers = max_workers | ||
self._limiter = asyncnullcontext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._limiter = asyncnullcontext | |
self._limiter = asyncnullcontext() |
I'm also curious, is there a reason we need to instantiate the limiter in _start
instead of __init__
? Does it need the event loop?
We might be able to set |
Thanks Michael. I've pushed some test ideas, but I can't get the tests to run with One thing I'm not sure of is if |
tests/test_task_runners.py
Outdated
class TestConcurrentTaskRunnerSingleThreaded(TaskRunnerStandardTestSuite): | ||
@pytest.fixture | ||
def task_runner(self): | ||
yield ConcurrentTaskRunner(max_workers=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may need to / be able to change the the concurrency_type
on this instance of the task runner from TaskConcurrencyType.CONCURRENT
to TaskConcurrencyType.SEQUENTIAL
so it goes through the sequential standard tests
Sounds like you're missing some test requirements. Did you install with the
Also not sure about what will happen there :) that seems like a possibility. If it's really a pain, we can explore a different test pattern. |
Ok I see that about the tests. I adjusted it to simply override the concurrency type, which hopefully will let it pass the tests. But I'm having trouble getting a dev env working on my computer, and unfortunately I have to focus on some other high priority work for me now. Short of tinkering with the tests, is there anything else I can do to help? I've added you as a collaborator to my fork. |
I've run the tests in CI and it looks like there are some issues. We don't have the bandwidth to prioritize this right now, but if it languishes we'll eventually get an engineer to get it across the line. Futzing with the tests is definitely the biggest part of this change :) |
Some more testing on my end shows that with more complex flows, this method clogs up and results in flows indefinitely stalling. I think it's because pending tasks count toward the thread limit. For my use case, I ended up using task tag concurrency limits, where each flow run I generate a unique tag to limit all tasks of that flow run, and then delete it when I'm done. I'm not sure what the Prefect solution should be, but maybe it needs to follow a similar pattern to that used by the task tags. In any case, I'm going to close this. Thank you! |
Thanks for investigating! It seems like we'd need to take the concurrency limit right before calling the user's function which is indeed trickier. |
Adds a CapacityLimiter to the ConcurrentTaskRunner to limit the number of concurrent threads. This is helpful for flows that would otherwise spawn hundreds of threads.
Example
Checklist
<link to issue>
"fix
,feature
,enhancement