Skip to content

fix(telemetry): flush events when called from a running event loop#161

Merged
lipikaramaswamy merged 2 commits into
mainfrom
lipikaramaswamy/bugfix/telemetry-flush-running-loop
May 18, 2026
Merged

fix(telemetry): flush events when called from a running event loop#161
lipikaramaswamy merged 2 commits into
mainfrom
lipikaramaswamy/bugfix/telemetry-flush-running-loop

Conversation

@lipikaramaswamy
Copy link
Copy Markdown
Collaborator

Summary

TelemetryHandler.flush() was silently dropping every event when called from a thread with a running asyncio loop — i.e. every Jupyter notebook run, every async test, and any caller invoking Anonymizer.run() from within asyncio.run(...). CLI runs (no pre-existing loop) were unaffected, which is why this surfaced as "my notebook events don't reach the dashboard, but my colleague's CLI events do."

Root cause. flush() called asyncio.run(self._flush_events()) directly. asyncio.run raises RuntimeError: asyncio.run() cannot be called from a running event loop when invoked on a thread with an active loop. The surrounding except Exception: pass swallowed it and the coroutine was garbage-collected with a RuntimeWarning: coroutine '_flush_events' was never awaited, so the queued batch never left the process.

Fix. Adopt the same _run_sync pattern DataDesigner already uses (data_designer/engine/models/telemetry.py:283-296): detect a running loop, and if one is active off-load the coroutine to a one-shot ThreadPoolExecutor (which gets its own fresh loop); otherwise plain asyncio.run. Our handler was deliberately stripped down from DD's (no background timer task — Anonymizer runs are fire-and-flush, see TelemetryHandler docstring + AGENTS.md), but in flattening we kept the bare asyncio.run and dropped the _run_sync indirection that protected DD from this exact case. Re-adopting it keeps the idiom consistent across NeMo.

Also upgrades the swallowed-error arm in flush() from pass to logger.debug("Telemetry flush failed", exc_info=True), so future debugging of telemetry issues is a logging.getLogger("anonymizer").setLevel(DEBUG) away instead of patching telemetry.py with raise.

Type of Change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation update
  • Refactoring

Testing

  • make test passes locally (80/80 in the telemetry suites; full suite green)
  • make check passes locally (format + lint)
  • Added/updated tests for changes

New regression coverage in tests/test_telemetry.py::TestFlushFromRunningLoop:

  • flush() from inside asyncio.run(...) drives _flush_events to completion
  • the full with TelemetryHandler(...) as h: shape (what _emit_telemetry_event uses at the call site) actually POSTs from inside a running loop
  • _run_sync re-raises so the new logger.debug arm in flush() is reachable

Manual verification. Repro'd the bug on main by running a notebook cell that constructed a TelemetryHandler and enqueued one event — got RuntimeWarning: coroutine 'TelemetryHandler._flush_events' was never awaited. Pulled this branch, re-ran the same cell, no warning, event POSTed successfully (verified by pointing NEMO_TELEMETRY_ENDPOINT at https://httpbin.org/post and observing the request body). Also ran Anonymizer().preview() end-to-end from a notebook — clean exit, no warning, no Telemetry flush failed debug log.

Documentation

  • No docs changes needed (internal handler implementation, behavior unchanged for healthy CLI path)

Related Issues

`TelemetryHandler.flush()` called `asyncio.run(self._flush_events())`
directly. `asyncio.run` raises `RuntimeError` when invoked on a thread
that already has a running event loop — which is the case in every
Jupyter kernel, async test runner, and any caller wrapping
`Anonymizer.run()` in `asyncio.run(...)`. The surrounding
`except Exception: pass` silently swallowed the error and the coroutine
was garbage-collected with a `RuntimeWarning: coroutine
'_flush_events' was never awaited`, so the entire queued batch never
left the process. CLI runs (no pre-existing loop) were unaffected,
which is why this surfaced as "my notebook events don't reach the
dashboard, but my colleague's CLI events do."

Fix mirrors DataDesigner's `TelemetryHandler._run_sync`
(`packages/data-designer-engine/src/data_designer/engine/models/telemetry.py:283-296`):
detect a running loop, and if one is active off-load the coroutine to
a one-shot `ThreadPoolExecutor` (which gets its own fresh loop);
otherwise plain `asyncio.run`. Our handler was deliberately stripped
down from DD's (no background timer task — Anonymizer runs are
fire-and-flush), but in flattening we kept bare `asyncio.run` and
dropped the `_run_sync` indirection that protected DD from this exact
case. Re-adopting it keeps the idiom consistent across NeMo.

Also upgrades the swallowed-error arm from `pass` to
`logger.debug("Telemetry flush failed", exc_info=True)` so future
debugging is a `logging.getLogger("anonymizer").setLevel(DEBUG)` away
instead of patching `telemetry.py` with `raise`.

Regression coverage in `tests/test_telemetry.py::TestFlushFromRunningLoop`:
- `flush()` from inside `asyncio.run(...)` drives `_flush_events` to
  completion
- the full `with TelemetryHandler(...) as h:` shape (what
  `_emit_telemetry_event` uses) actually POSTs from inside a running
  loop
- `_run_sync` re-raises so the new `logger.debug` arm in `flush()` is
  reachable
@lipikaramaswamy lipikaramaswamy requested a review from a team as a code owner May 18, 2026 18:26
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 18, 2026

Greptile Summary

This PR fixes a silent event-drop bug in TelemetryHandler.flush(): previously asyncio.run(self._flush_events()) raised RuntimeError when a loop was already running (Jupyter, async tests, asyncio.run(Anonymizer.run(...))), which was caught by the bare except Exception: pass and discarded. The fix adopts the same _run_sync pattern from DataDesigner — detect a running loop via asyncio.get_running_loop(), and if one is active, submit the coroutine to a one-shot ThreadPoolExecutor (its own fresh loop); otherwise fall back to plain asyncio.run.

  • TelemetryHandler._run_sync is a new @staticmethod that cleanly handles both sync and async caller contexts with correct exception propagation.
  • The except Exception: pass arm is upgraded to logger.debug(\"Telemetry flush failed\", exc_info=True), making future flush failures surfaceable without patching the source.
  • Three regression tests cover the running-loop flush path, the full context-manager shape, and exception propagation through _run_sync.

Confidence Score: 5/5

Safe to merge — the change is a well-scoped, self-contained fix to a silent failure path with no impact on the healthy CLI path.

The _run_sync implementation correctly handles both the no-loop (plain asyncio.run) and running-loop (thread-pool delegation) cases, exception propagation is preserved, and the fix is narrowly targeted to flush(). The logic mirrors the DataDesigner pattern that is already proven in production. Three focused regression tests cover the new code paths, and the existing 80-test suite is reported green. No data loss, no behavioral change for sync callers.

No files require special attention.

Important Files Changed

Filename Overview
src/anonymizer/telemetry.py Adds _run_sync static method that off-loads coroutines to a ThreadPoolExecutor when a running loop is detected, replacing the bare asyncio.run call in flush(); also upgrades the swallowed-error arm to logger.debug.
tests/test_telemetry.py Adds TestFlushFromRunningLoop with three regression tests covering flush from a running loop, the full context-manager shape, and exception propagation through _run_sync.

Sequence Diagram

sequenceDiagram
    participant Caller as Caller (sync or async)
    participant flush as TelemetryHandler.flush()
    participant run_sync as _run_sync()
    participant TPE as ThreadPoolExecutor (worker thread)
    participant flush_events as _flush_events() coroutine

    Caller->>flush: flush()
    flush->>run_sync: _run_sync(self._flush_events())
    run_sync->>run_sync: asyncio.get_running_loop()

    alt No running loop (CLI / plain sync)
        run_sync->>flush_events: asyncio.run(coro) — current thread
        flush_events-->>run_sync: done
    else Running loop exists (Jupyter / async test)
        run_sync->>TPE: pool.submit(asyncio.run, coro)
        Note over TPE: Fresh event loop in worker thread
        TPE->>flush_events: asyncio.run(coro)
        flush_events-->>TPE: done
        TPE-->>run_sync: future.result()
    end

    run_sync-->>flush: return (or raise → logger.debug)
    flush-->>Caller: return
Loading

Reviews (2): Last reviewed commit: "Apply suggestion from @greptile-apps[bot..." | Re-trigger Greptile

Comment thread src/anonymizer/telemetry.py Outdated
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
@lipikaramaswamy lipikaramaswamy merged commit c41a756 into main May 18, 2026
11 checks passed
@lipikaramaswamy lipikaramaswamy deleted the lipikaramaswamy/bugfix/telemetry-flush-running-loop branch May 18, 2026 18:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants