feat(subscribers): add JSONL event log subscriber#6420
feat(subscribers): add JSONL event log subscriber#6420
Conversation
Greptile SummaryThis PR introduces Key observations:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant App
participant Context as DaftContext
participant Sub as EventLogSubscriber
participant FS as FileSystem (per-query dir)
App->>Context: enable_event_log(log_dir)
Context->>Sub: __init__(log_dir)
Context->>Context: attach_subscriber(_EVENT_LOG_ALIAS, sub)
Note over App,FS: Query execution begins
Context->>Sub: on_query_start(query_id, metadata)
Sub->>FS: mkdir(log_dir/query_id)
Sub->>FS: open events.jsonl (append, line-buffered)
Sub->>FS: write event_log_started {daft_version}
Sub->>FS: write query_started {}
Sub->>FS: write plan_unoptimized {plan}
Context->>Sub: on_optimization_start(query_id)
Sub->>FS: write optimization_started {}
Context->>Sub: on_optimization_end(query_id, optimized_plan)
Sub->>FS: write optimization_ended {duration_ms}
Sub->>FS: write plan_optimized {plan}
Context->>Sub: on_exec_start(query_id, physical_plan)
Sub->>FS: write execution_started {}
Sub->>FS: write plan_physical {plan}
loop Per operator (concurrent)
Context->>Sub: on_exec_operator_start(query_id, node_id)
Sub->>FS: write operator_started {node_id}
Context->>Sub: on_exec_emit_stats(query_id, stats)
Sub->>FS: write stats {node_id, metrics}
Context->>Sub: on_exec_operator_end(query_id, node_id)
Sub->>FS: write operator_ended {node_id, duration_ms}
end
Context->>Sub: on_result_out(query_id, partition)
Sub->>FS: write result_out {rows}
Context->>Sub: on_exec_end(query_id)
Sub->>FS: write execution_ended {duration_ms}
Context->>Sub: on_query_end(query_id, result)
Sub->>FS: write query_ended {status, duration_ms}
Sub->>FS: close events.jsonl
App->>Context: disable_event_log()
Context->>Context: detach_subscriber(_EVENT_LOG_ALIAS)
Context->>Sub: close()
Sub->>FS: close any remaining open file handles
Last reviewed commit: "log per query id" |
daft/subscribers/events.py
Outdated
| return time.monotonic() * 1000 | ||
|
|
||
|
|
||
| def _generate_run_id() -> str: |
There was a problem hiding this comment.
we made them a nice human readable id that shows up in the dashboard #6310. Would it be possible to reuse that so users can quickly correlate runs in the dashboard to these events.
There was a problem hiding this comment.
That's going to be tricky. Those names are associated with a query and a script can execute many queries. This means we would need to maintain and write multiple files. I thought for the first version it would be easier to write to a single file per script execution. Thoughts?
There was a problem hiding this comment.
I am of the opinion that it makes the most sense to have the query be the atomic unit as opposed to the script. @rchowell and I had a lengthy discussion about this the other day actually, and he may have some opinions on it as well. I think there's pros/cons of both sides but ultimately I think having everything 1-1 with the query (not script) will make things easier to reason about. Additionally, considering the dashboard uses queries as the atomic unti, it would be odd to me if we introduced an additional grouping here that does not exist there.
Happy to have further discussions as well.
There was a problem hiding this comment.
My 2-cents is log to a file whose name is the query id.
Put it in some conventional location, then it's trivial to show logs in the dashboard for each query.
There was a problem hiding this comment.
Discussed with @universalmind303 offline and will go with a directory per query id to start. So the format will be <log_dir>/<query_id>/events.jsonl.
daft/subscribers/events.py
Outdated
| def _write_event(self, event_name: str, payload: dict[str, Any]) -> None: | ||
| if self._closed: | ||
| return | ||
| record: dict[str, Any] = {"event": event_name, "ts": _iso_now()} | ||
| record.update(payload) | ||
| try: | ||
| self._file.write(json.dumps(record, default=_json_default) + "\n") | ||
| except OSError: | ||
| pass # Don't let logging failures affect query execution | ||
|
|
||
| def close(self) -> None: | ||
| if self._closed: | ||
| return | ||
| self._closed = True | ||
| self._file.close() |
There was a problem hiding this comment.
Shared mutable state and file writes are not thread-safe
The sample log in the PR description shows concurrent operator_started events just 2ms apart (node IDs 1–4 at 23:33:54.901, .903, .905, .906), confirming that subscriber callbacks are invoked from multiple threads simultaneously.
The current implementation has several unprotected shared-state operations:
self._operator_starts[(query_id, node_id)] = _mono_ms()andself._operator_starts.pop(...)inon_exec_operator_start/on_exec_operator_endcalled concurrently from different threadsself._closedis checked andself._file.write(...)is called non-atomically in_write_event— a thread could pass theif self._closed: returnguard just before another thread runsclose(), then attempt a write on a closed file (thoughOSErroris caught)- With Python 3.13+ free-threaded mode (
--disable-gil) or non-CPython runtimes, individual dict operations are also no longer atomic
A threading.Lock should guard both the file write and any mutation of the timing-state dicts:
import threading
def __init__(self, log_dir: str | Path, run_id: str | None = None) -> None:
...
self._lock = threading.Lock()
...
def _write_event(self, event_name: str, payload: dict[str, Any]) -> None:
if self._closed:
return
record: dict[str, Any] = {"event": event_name, "ts": _iso_now()}
record.update(payload)
try:
with self._lock:
self._file.write(json.dumps(record, default=_json_default) + "\n")
except OSError:
passAnd similarly wrap mutations to _operator_starts / _exec_starts / etc. in the same lock (or a separate state lock).
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6420 +/- ##
==========================================
+ Coverage 74.79% 74.80% +0.01%
==========================================
Files 1021 1023 +2
Lines 136569 137193 +624
==========================================
+ Hits 102142 102630 +488
- Misses 34427 34563 +136
🚀 New features to boost your workflow:
|
|
Cool! Looking at the log output, is the plan to add the operator multiline displays in the output too? E.g. "Filter: col(x) > 5", "Project: col(x) + 1, col(y) * 2"? Would be super useful |
Yes, the plan is to add these types of things like this in additional PRs. The goal of this one is to output what we currently expose and then iterate. |
| def close(self) -> None: | ||
| if self._closed: | ||
| return | ||
| self._closed = True | ||
| for query_file in self._query_files.values(): | ||
| query_file.close() | ||
| self._query_files.clear() |
There was a problem hiding this comment.
Exception in
close() leaks remaining file handles
If query_file.close() raises an OSError for one of the open handles (e.g., a network-backed filesystem flushing buffered data), the for loop exits immediately and all remaining file handles in self._query_files are never closed. self._query_files.clear() is also never reached, leaving the dict populated with dead handles.
Wrapping each close in a try/except ensures the rest of the handles are always released:
| def close(self) -> None: | |
| if self._closed: | |
| return | |
| self._closed = True | |
| for query_file in self._query_files.values(): | |
| query_file.close() | |
| self._query_files.clear() | |
| def close(self) -> None: | |
| if self._closed: | |
| return | |
| self._closed = True | |
| for query_file in self._query_files.values(): | |
| try: | |
| query_file.close() | |
| except OSError: | |
| pass | |
| self._query_files.clear() |
tests/test_events.py
Outdated
| def test_event_log_subscriber_writes_session_header(tmp_path): | ||
| subscriber = EventLogSubscriber(tmp_path) | ||
| subscriber.close() | ||
|
|
||
| assert list(tmp_path.rglob("events.jsonl")) == [] |
There was a problem hiding this comment.
Test name contradicts its assertion
test_event_log_subscriber_writes_session_header implies that the test verifies a session header is written to disk. However the only assertion is assert list(tmp_path.rglob("events.jsonl")) == [] — i.e., it verifies that no file is created when the subscriber is closed without processing any queries. The name is the opposite of what the test actually checks, which will confuse future readers.
A clearer name would be something like test_event_log_subscriber_creates_no_files_on_close_without_queries.
| def _clear_query_state(self, query_id: str) -> None: | ||
| """Remove any leftover timing state for the given query.""" | ||
| self._optimization_starts.pop(query_id, None) | ||
| self._exec_starts.pop(query_id, None) | ||
|
|
||
| stale_operator_keys = [key for key in self._operator_starts if key[0] == query_id] | ||
| for key in stale_operator_keys: | ||
| self._operator_starts.pop(key, None) |
There was a problem hiding this comment.
_clear_query_state silently omits _query_starts
The docstring says "Remove any leftover timing state for the given query," yet _query_starts is not cleared here. It works today because every caller (on_query_end) pops from _query_starts before calling this method. However, the omission is a subtle footgun: if _clear_query_state is ever called from a new code path (e.g., a forced cleanup or a new on_query_cancel hook), the query-start timestamp will silently linger, causing the next on_query_end for a recycled query_id to compute a wildly wrong duration_ms.
Consider adding self._query_starts.pop(query_id, None) to _clear_query_state and removing the manual pop in on_query_end, so all timing cleanup is centralised in one place.
desmondcheongzx
left a comment
There was a problem hiding this comment.
Thanks Chris! I think the per-query directory approach is the right call
Overall looks good to me! Let's merge and keep iterating
Changes Made
EventLogSubscriber writes query lifecycle events (start, optimization, execution, operator, stats) to a per-run JSONL file under ~/.daft/events/ directory as the default. This is configurable. Includes enable_event_log()/disable_event_log() helpers for global attachment with atexit cleanup.
Each query log is contained in a directory with following format
<log_dir>/<query_id>/events.jsonl.This is experimental and the api will probably change in future PRs.
Related Issues
sample log