feat: add live job log streaming#4454
Conversation
Introduces LogStreamer, JobLogStreamer, and JobLogReceiver for real-time log tailing from clients to the server during a federated job. Deprecates ErrorLogSender and LogReceiver in favour of the new live-streaming components. Key design points: - _LogTailProducer tails a growing log file, survives log rotation, sends liveness heartbeats, and does a drain retry after stop_event fires to capture bytes written by cleanup code. - JobLogStreamer uses a two-phase stop: ABOUT_TO_END_RUN sets stop_event only (returns immediately so framework log lines land in the file during the drain window); END_RUN joins the thread to keep client_run() alive until EOF is acknowledged by the server. - A fresh Signal is injected into the streaming FLContext via put() (not set_prop()) to bypass the mask-consistency check that would otherwise leave the triggered run_abort_signal in place and cause the sender to abort mid-stream. - JobLogReceiver writes chunks directly to a file as they arrive so the log can be followed with tail -f; hands the file to the job manager when the stream closes. - Includes hello-log-streaming example and unit tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
SystemLogStreamer is a system-level widget (resources.json) that automatically injects a JobLogStreamer into every job that doesn't already declare one. It hooks BEFORE_JOB_LAUNCH to modify the deployed config_fed_client.json before the job subprocess starts — no duplicate streaming code needed. Provisioning template changes: - Client: ErrorLogSender → SystemLogStreamer (streams error_log.txt) - Server: LogReceiver → JobLogReceiver - Fixed _modify_error_sender to match the new component id Also downgraded the "file not created" log from WARNING to INFO in JobLogStreamer since error_log.txt not existing is expected for successful jobs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Greptile SummaryThis PR replaces the deprecated All previously flagged P0/P1 concerns have been addressed: the Confidence Score: 5/5Safe to merge; all previously flagged P0/P1 issues are resolved and only minor P2 style/polish items remain. All blocking issues from the prior review round (path traversal, stale fl_ctx, watchdog early-crash, liveness/idle constraint, wire-protocol rename documentation) have been addressed. The two remaining comments are cosmetic: a misleading 'retained at None' warning message and a non-configurable 60 s join timeout — neither affects correctness or security. No files require special attention; the minor log-message fix in job_log_receiver.py is optional polish. Important Files Changed
Sequence DiagramsequenceDiagram
participant SLS as SystemLogStreamer(CLIENT_PARENT)
participant JLS as JobLogStreamer(CLIENT_JOB)
participant SR as stream_runner(Server)
participant JLR as JobLogReceiver(Server)
SLS->>SLS: BEFORE_JOB_LAUNCH patch config_fed_client.json
Note over JLS: Job subprocess starts with patched config
JLS->>JLS: START_RUN spawn streaming thread with fresh FLContext
JLS->>SR: open stream seq=0 with stream_ctx
SR->>SR: inject END_STREAM hook into stream_ctx
SR->>JLR: get_consumer creates LogChunkConsumer and watchdog thread
loop Live tail loop
JLS->>SR: data chunk
SR->>JLR: consume writes to file and flushes
JLR-->>SR: OK continue
Note over JLS,SR: If log quiet beyond liveness interval
JLS->>SR: heartbeat message
SR->>JLR: consume resets idle clock
end
JLS->>JLS: ABOUT_TO_END_RUN set stop_event
JLS->>JLS: drain remaining bytes
JLS->>SR: EOF chunk
SR->>JLR: consume returns stop
JLR->>JLR: finalize stops watchdog
SR->>JLR: dispatch stream done callback
JLR->>JLR: close file store via job_manager
JLS->>JLS: END_RUN join streaming thread up to 60s
Note over SLS: JOB_COMPLETED error log only
SLS->>SR: upload snapshot with pre-set stop_event
Reviews (10): Last reviewed commit: "Document FileStreamer wire-key namespace..." | Re-trigger Greptile |
Documents the architecture, two-phase stop strategy, drain retry, fresh abort signal, and rationale for running streaming inside the job subprocess (Docker/K8s launcher compatibility). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The streamed log always misses the last few framework teardown lines written after END_RUN completes — document this as a known limitation. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Match the provisioning template by setting error_log.txt as the streamed file name. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fix watchdog to detect dead senders that crash before sending any data: seed fl_ctx from get_consumer() instead of waiting for first consume(), and stop resetting the idle clock when no message has arrived. - Clean up partial log file on disk when stream ends with non-OK return code (e.g. TIMEOUT, TASK_ABORTED). - Rename _modify_error_sender to _modify_log_streamer to match the current component name. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Retain the partial file for debugging instead of deleting it. Log a warning with the file path. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The watchdog polled every 1.0s regardless of idle_timeout, making sub-second timeouts indistinguishable. Now polls at min(1.0, idle_timeout/3) so the watchdog resolution matches the configured threshold. Updated test to stagger heartbeats and verify independent timeout ordering. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Reject absolute paths and path traversal (..) to prevent streaming files outside the job's log directory. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Apply os.path.basename() to job_id, client name, and log file name from the stream context before constructing the destination path. This prevents directory traversal via '..' or absolute paths in client-supplied values. Also corrected docstring that falsely claimed the liveness_interval < idle_timeout constraint is enforced at call time — the values are configured on different sites. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
chesterxgchen
left a comment
There was a problem hiding this comment.
add a few comments
|
/build |
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
/build |
chesterxgchen
left a comment
There was a problem hiding this comment.
LGTM overall
a few minor comments ( FLARE 2.6.0 release notes change)
and a few questions u can address later
Summary
ErrorLogSender/LogReceiverwhich only sent static snapshots after job completion.resources.jsonthat automatically injectsJobLogStreamerinto jobs that don't already declare one, by modifying the deployedconfig_fed_client.jsonatBEFORE_JOB_LAUNCH.SystemLogStreamer(streamserror_log.txt), server usesJobLogReceiver.hello-log-streamingexample and unit tests.Why streaming runs inside the job, not in CLIENT_PARENT
The streamer must run inside the job subprocess rather than being managed from the parent process. With Docker or Kubernetes job launchers, the job may run in a different container or pod that the parent process has no filesystem access to. Streaming from CLIENT_PARENT would only work with the local process launcher. By injecting
JobLogStreamerinto the job config, the streamer always runs where the log file lives — regardless of the launch mechanism.Key design points
LogStreamertails growing log files, survives log rotation, and sends liveness heartbeats so the receiver can detect dead senders via idle timeout.JobLogStreameruses a two-phase stop:ABOUT_TO_END_RUNsetsstop_eventonly (returns immediately so framework log lines land in the file during the drain window);END_RUNjoins the thread to keepclient_run()alive until EOF is acknowledged.Signalis injected into the streamingFLContextviaput()(notset_prop()) to bypass the mask-consistency check that would otherwise leave the triggeredrun_abort_signalin place.JobLogReceiverwrites chunks directly to a file as they arrive so the log can be followed withtail -f.Test plan
examples/hello-world/hello-log-streaming/job.pyin simulator — verify streamed log matches originalSystemLogStreamerinjectsJobLogStreamerinto jobs without oneALLOW_ERROR_SENDING=falsein project.yml removesSystemLogStreamerfrom provisionedresources.jsonpytest tests/unit_test/app_common/streamers/log_streamer_test.py🤖 Generated with Claude Code