-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Description
Bug summary
I'm writing a scientific workflow which includes optimization loops. Multiple optimization loops and cost function evaluations run in parallel with the ThreadPoolTaskRunner.
After certain loop number, I encounter
Task run failed with exception: OSError(24, 'Too many open files') - Retries are exhausted
which indicates a soft limit of file descriptor open per process.
The simplified structure of my program looks like:
from prefect import flow, task, tags
@task
def inner_task():
# do something
pass
@task
def outer_task():
for i in range(5):
with tags(f"loop {i}"):
futures = []
for j in range(4):
with tags(f"instance {j}"):
fut = inner_task.submit()
futures.append(fut)
for fut in futures:
fut.result()
@flow
def main():
fut1 = outer_task.submit()
...
fut1.wait()
if __name__ == "__main__":
main()Here I assume the inner_task is a time consuming IO bound subroutine of the cost function. For better traceability and visibility of my cost function execution I want to tag them recursively -- this is why tags context is helpful here. As a side effect, each cost function spawns a new thread and the thread is pooled by the thread pool executor even after execution.
This accumulates the file descriptor associated with threads during the loop, and at certain point the workflow crashes due to their soft limit. Of course I can change the soft limit, but this is just putting off the problem.
To confirm above description of the problem, you can try the following complete script.
import os
import psutil
from prefect import flow, task, tags
from prefect.logging import get_run_logger
@task
def inner_task():
# do something
pass
@task
def outer_task():
logger = get_run_logger()
for i in range(5):
with tags(f"loop {i}"): # loop tag
num_fd = psutil.Process(os.getpid()).num_fds()
num_th = psutil.Process(os.getpid()).num_threads()
logger.info(f"Open FDs at loop {i} begin: {num_fd}")
logger.info(f"Threads at loop {i} begin: {num_th}")
futures = []
for j in range(4):
with tags(f"instance {j}"): # instance tag
fut = inner_task.submit()
futures.append(fut)
for fut in futures:
fut.result()
num_fd_after = psutil.Process(os.getpid()).num_fds()
num_th_after = psutil.Process(os.getpid()).num_threads()
logger.info(f"Open FDs at loop {i} end: {num_fd_after} (+ {num_fd_after - num_fd})")
logger.info(f"Threads at loop {i} end: {num_th_after} (+ {num_th_after - num_th})")
@flow
def main():
fut = outer_task.submit()
fut.wait()
if __name__ == "__main__":
main()Version info
Version: 3.4.2
API version: 0.8.4
Python version: 3.11.11
Git commit: c3c1c119
Built: Mon, May 19, 2025 04:04 PM
OS/Arch: darwin/x86_64
Profile: local
Server type: server
Pydantic version: 2.11.4
Additional context
I did a small experiment with different settings.
As-is (every function call spawns new thread / 4 instances per loop):
...
Threads at loop 4 end: 32 (+ 4)
Open FDs at loop 4 end: 33 (+ 5)
...
I remove instance tag (some threads are reused -- behavior is a bit stochastic):
...
Threads at loop 4 end: 17 (+ 1)
Open FDs at loop 4 end: 18 (+ 2)
...
I remove loop tag (no thread is accumulated, 8 threads are added at loop 0):
...
Threads at loop 4 end: 16 (+ 0)
Open FDs at loop 4 end: 17 (+ 1)
...
I remove both tags (no thread is accumulated, 4 threads are added at loop 0):
...
Threads at loop 4 end: 12 (+ 0)
Open FDs at loop 4 end: 12 (+ 0)
...
This result indicates tags context doesn't work well with the thread pool executor.