Skip to content

Add STDERR response type for per-task stderr attribution #27

@ctrueden

Description

@ctrueden

Problem

Worker stderr is a process-level stream, not a task-level one. When a script (or library it calls) writes to sys.stderr, the service reads it in a dedicated stderrLoop thread that has no visibility into which task is currently running. The line is appended to service.errorLines() / service.error_lines with no task UUID attached.

This means:

  • Task listeners (task.listen(...)) never receive stderr content while the task is running — only as part of the crash dump if the whole process dies.
  • With multiple concurrent tasks it is impossible to know which task produced a given stderr line.
  • The only workaround today is task.update(message=...), which requires the script author to opt in and gives up idiomatic Python logging/warnings/tracebacks.

Proposed Solution

Add a new STDERR response type to the worker protocol. The worker intercepts per-task stderr at the Python layer, re-emits it as a tagged JSON message on stdout, and the service dispatches it to the correct task listener.

Protocol change

New response type:

{
  "task": "87427f91-d193-4b25-8d35-e1292a34b5c4",
  "responseType": "STDERR",
  "line": "DeprecationWarning: use foo() instead of bar()\n"
}

STDERR is non-terminal (like UPDATE): it can fire any number of times between LAUNCH and the terminal response.

Python worker changes (python_worker.py)

In Task._run(), before calling _report_launch(), install a thread-dispatching stderr wrapper. The wrapper's write() method checks which task owns the current thread and emits the line as a STDERR response. The real sys.stderr fd is still written to in parallel so the service's stderrLoop / error_lines keeps working (useful for service.debug() and crash post-mortems).

Sketch:

import io, threading

_task_local = threading.local()

class _TaskStderr(io.TextIOBase):
    def __init__(self, real_stderr):
        self._real = real_stderr

    def write(self, s):
        self._real.write(s)   # still visible to stderrLoop / error_lines
        task = getattr(_task_local, "current_task", None)
        if task is not None and s.strip():
            task._respond(ResponseType.STDERR, {"line": s})
        return len(s)

    def flush(self):
        self._real.flush()

Install once at worker startup:

sys.stderr = _TaskStderr(sys.stderr)

At the start of Task._run():

_task_local.current_task = self

Clear it on exit (in the finally block):

_task_local.current_task = None

Java service changes (Service.java):

  1. Add STDERR to ResponseType enum (it is non-terminal, like UPDATE).
  2. In Task.handle(), handle the STDERR case: extract the "line" field and deliver a TaskEvent to registered listeners.
  3. TaskEvent can reuse message field.
case STDERR:
    String line = (String) response.get("line");
    TaskEvent event = new TaskEvent(this, responseType, line, ...);
    listeners.forEach(l -> l.accept(event));
    return; // Don't fall through to the general notifyAll path

Limitations

C extensions writing directly to fd 2. Python's sys.stderr replacement only intercepts Python-level writes. Native code (e.g. some NumPy/BLAS builds) can bypass it. These lines still go to service.errorLines() unattributed.

Groovy worker

The same approach applies in principle — redirect System.err to a PrintStream that dispatches to the current task.

Acceptance criteria

  • STDERR is documented in worker-protocol.rst alongside UPDATE
  • Python and Groovy workers capture per-task stderr and emits STDERR responses in real time
  • Java Service.ResponseType includes STDERR (non-terminal)
  • TaskEvent exposes stderr line content
  • Task listeners receive STDERR events during task execution
  • service.errorLines() / error_lines still accumulates all stderr lines (unchanged) (?)
  • service.debug() still shows [WORKER-N] lines for all stderr (unchanged)
  • capturing-output.rst updated to reflect the new capability and note limitations

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions