Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stdout and stderr from tasks are not routed to the correct graph nodes #1380

Closed
5 tasks
cjao opened this issue Oct 24, 2022 · 0 comments · Fixed by #1391
Closed
5 tasks

Stdout and stderr from tasks are not routed to the correct graph nodes #1380

cjao opened this issue Oct 24, 2022 · 0 comments · Fixed by #1391

Comments

@cjao
Copy link
Contributor

cjao commented Oct 24, 2022

The method Covalent currently uses to retrieve the stdout and stderr logs of each task is unreliable. When multiple tasks are run concurrently, the messages output by one task may be inadvertently attributed to another task and stored in the wrong graph node.

The following workflow reproduces the problem on my machine.

from covalent.executor import DaskExecutor
from dask.distributed import LocalCluster
lc = LocalCluster()
dask_exec = DaskExecutor(lc.scheduler_address)

@ct.electron(executor=dask_exec)
def task_0():
    print("Hello from Task 0")
    return 0
    
@ct.electron(executor=dask_exec)
def task_1():
    import time
    time.sleep(2)    
    print("Hello from Task 1")
    return 1    
    
@ct.lattice
def workflow():
    task_0()
    task_1()

After dispatching this workflow, I retrieved the result object and inspected the stdout property of each graph node. Here is what I get:

dispatch_id = ct.dispatch(workflow)()
res = ct.get_result(dispatch_id, wait=True)
print("Task 0 stdout:", res.lattice.transport_graph.get_node_value(0, "stdout"))
print("Task 1 stdout:", res.lattice.transport_graph.get_node_value(1, "stdout"))
Task 0 stdout: 
Task 1 stdout: Hello from Task 0

If I interchange the sleep statements, the output of each task is correctly retrieved and persisted:

@ct.electron(executor=dask_exec)
def task_0():
    print("Hello from Task 0")
    return 0
    
@ct.electron(executor=dask_exec)
def task_1():
    import time
    time.sleep(2)    
    print("Hello from Task 1")
    return 1

dispatch_id = ct.dispatch(workflow)()
res = ct.get_result(dispatch_id, wait=True)
print("Task 0 stdout:", res.lattice.transport_graph.get_node_value(0, "stdout"))
print("Task 1 stdout:", res.lattice.transport_graph.get_node_value(1, "stdout"))
Task 0 stdout: Hello from Task 0

Task 1 stdout: Hello from Task 1

Why this happens

Each executor's implementation of run() is currently expected to retrieve the stdout and stderr from the executor backend after a task completes and print those strings to sys.stdout and sys.stderr, respectively. These streams are redirected by a context manager in the base executor's implementation of execute() and returned to the dispatcher. For example, here is how the Dask executor captures the stdout for a task.

Since all executor instances monitor the same sys.stdout file descriptor, this technique breaks down when multiple tasks are writing to that file descriptor. The context manager for one task could inadvertently capture the output printed by the run() method for another task.

It would seem better for each task to print to its own "stdout" and "stderr" file descriptors which are not shared with any other task.

Design

Acceptance Criteria

  • For BaseExecutor and AsyncBaseExecutor

    • The redirection logic for sys.stdout and sys.stderr is removed.
    • Task-specific streams self._task_stdout and self._task_stderr are instantiated at the beginning of execute(). These attributes should be defined only at the beginning of execute() and not in the executor constructor so that they don't appear during workflow construction. They should be made accessible to run() as the properties self.task_stdout and self.task_stderr, respectively.
  • Local and Dask executors should be adjusted:

    • The stdout and stderr generated by the task are printed to self.task_stdout and self.task_stderr, respectively.
    • When there are concurrent tasks, the stdout and stderr for each task is stored in the correct transport graph node; the test cases in #1380 should pass.
  • Add a functional test involving a workflow with multiple electrons concurrently printing to stdout and stderr. Verify that the messages generated by each task are stored in the correct transport graph node.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant