Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions app/routers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,28 @@ async def _run_long_poll_wait(
so tests can monkeypatch shorter values without function
signature churn.
"""
# P0 fix (2026-05-22): release any implicit transaction held by
# the caller's initial pull_events() SELECT BEFORE entering the
# sleep loop. Without this, the SQLAlchemy session keeps the
# Postgres connection in `idle in transaction` state for the full
# ~30s wait window. At ~15 concurrent long-pollers (SQLAlchemy
# default pool_size=5 + max_overflow=10 = 15) the entire app-side
# pool is held by waiting listeners + new requests time out
# waiting for a free connection. Empirical evidence: a downstream
# consumer (cue.dock.svc on Govind's org) tipped over at ~15
# concurrent listeners; pg_stat_activity showed 15 connections
# holding `SELECT events.id ...` for 14-20s each, matching the
# default pool ceiling.
#
# AsyncSession.commit() empirically verified to BOTH end the
# Postgres txn AND return the pool slot. The session re-acquires
# from the pool transparently on next operation. So this +2-line
# fix addresses both the Postgres idle-in-transaction symptom AND
# the SQLAlchemy app-pool exhaustion. The caller's post-helper
# _advance_ack_after_pull(db, ...) keeps working because the
# session itself remains usable across commits.
await db.commit()

deadline = asyncio.get_event_loop().time() + LONG_POLL_MAX_SECONDS
events: list = []
next_cursor: Optional[int] = None
Expand All @@ -457,6 +479,12 @@ async def _run_long_poll_wait(
)
if events:
break
# P0 fix continued: release the implicit transaction opened by
# the just-completed empty pull_events() SELECT so the next
# asyncio.sleep() window doesn't sit `idle in transaction`.
# Next iteration's pull_events() re-acquires from the pool
# transparently.
await db.commit()

return events, next_cursor, has_more

Expand Down
117 changes: 117 additions & 0 deletions tests/test_events_long_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,120 @@ async def test_helper_respects_since_cursor(
elapsed = asyncio.get_event_loop().time() - started
assert events == []
assert 0.8 < elapsed < 2.0


# ───────────────────────────────────────────────────────────────────────
# P0 fix (2026-05-22) β€” pool exhaustion regression guard. The long-poll
# helper MUST commit/release its transaction at entry AND between every
# empty iteration so it doesn't hold an `idle in transaction` connection
# for the full wait window. A downstream consumer (cue.dock.svc) tipped
# over at ~15 concurrent listeners with pg_stat_activity showing 15
# connections holding SELECT for 14-20s each (matching SQLAlchemy
# default pool_size=5 + max_overflow=10). Pin the invariant here as a
# regression guard.
# ───────────────────────────────────────────────────────────────────────


async def test_helper_commits_at_entry_and_between_empty_iterations(
db_session: AsyncSession,
lp_agent: Agent,
fast_long_poll,
):
"""Pin the P0 invariant: ``_run_long_poll_wait`` must call
``await db.commit()`` at entry + after every empty
``pull_events()`` so the long-poll session doesn't sit
`idle in transaction` for the full window.

Counts commit calls via a thin spy wrapper. With LONG_POLL_MAX_SECONDS
= 1.0s and LONG_POLL_INTERNAL_INTERVAL_SECONDS = 0.2s (set by
fast_long_poll), the loop fires ~5 sleep+pull iterations. Expect
at least 1 commit at entry + 1 per empty iteration.
"""
commit_count = 0
original_commit = db_session.commit

async def counting_commit():
nonlocal commit_count
commit_count += 1
await original_commit()

db_session.commit = counting_commit # type: ignore[assignment]

events, cursor, has_more = await _run_long_poll_wait(
db_session,
recipient_agent_id=lp_agent.id,
since=0,
limit=100,
event_type=None,
)

assert events == []
assert cursor is None
assert has_more is False
# At least 2 commits expected: one at entry + at least one
# between empty iterations. In practice with the 1.0s window
# and 0.2s interval it's ~6 (1 entry + ~5 per iter).
assert commit_count >= 2, (
f"Expected β‰₯2 commits (1 at entry + β‰₯1 between empty iterations) "
f"to release pool slot during long-poll wait β€” got {commit_count}. "
f"This is the P0 pool-exhaustion regression guard (2026-05-22)."
)


async def test_helper_commits_at_entry_even_when_event_arrives_immediately(
db_session: AsyncSession,
lp_agent: Agent,
fast_long_poll,
):
"""Regression guard: even when an event arrives on the first
sleep+poll iteration (before any empty iterations fire), the
entry-commit MUST have run to release the caller's initial-pull
implicit txn. Counts commit calls; expects β‰₯1 (the entry commit)
regardless of how many iterations the loop completes.
"""
async def insert_after_delay():
await asyncio.sleep(0.1)
# Use a fresh session β€” concurrent use of the helper's
# db_session would race with the helper's own commits.
from app.database import async_session
async with async_session() as fresh:
fresh.add(
Event(
event_type="message.delivered",
recipient_agent_id=lp_agent.id,
payload={"fast": True},
emitted_at=datetime.now(timezone.utc),
)
)
await fresh.commit()

commit_count = 0
original_commit = db_session.commit

async def counting_commit():
nonlocal commit_count
commit_count += 1
await original_commit()

db_session.commit = counting_commit # type: ignore[assignment]

# Race: helper sleeps 0.2s; inserter writes at 0.1s. Helper picks up
# the event on its first poll iteration.
inserter = asyncio.create_task(insert_after_delay())
events, cursor, has_more = await _run_long_poll_wait(
db_session,
recipient_agent_id=lp_agent.id,
since=0,
limit=100,
event_type=None,
)
await inserter

assert len(events) >= 1
# Entry-commit MUST have fired at least once even though the loop
# exited on the first iteration via `if events: break`.
assert commit_count >= 1, (
f"Expected β‰₯1 commit (entry-commit to release caller's initial-"
f"pull implicit txn) even when event arrives on first iteration. "
f"Got {commit_count}. P0 pool-exhaustion regression guard."
)