From 6ff69423405fb06f1fead9e725c759d5a5af06d0 Mon Sep 17 00:00:00 2001 From: Fido Can Code <190991155+FidoCanCode@users.noreply.github.com> Date: Fri, 17 Apr 2026 12:56:28 +0000 Subject: [PATCH 1/2] Atomic claim-before-reply to close TOCTOU on _replied_comments (closes #566) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the module-level set[int] with RepliedComments — a thread-safe class with an atomic claim() that checks-and-adds under a lock in one operation. The claim now happens *before* calling reply_to_comment() / reply_to_issue_comment(), not after: concurrent webhook deliveries for the same comment can no longer both pass the membership check before either adds the ID. On failure, release() clears the claim so a GitHub redelivery can retry. The already_replied parameter on reply_to_review() is removed since that function is a stub that never used it. Co-Authored-By: Claude Sonnet 4.6 --- kennel/events.py | 3 +- kennel/server.py | 95 ++++++++++++++++++++++++++++++++++++-------- tests/test_server.py | 65 +++++++++++++++++++++++++++++- 3 files changed, 144 insertions(+), 19 deletions(-) diff --git a/kennel/events.py b/kennel/events.py index c5c15fbf..0b8aedd1 100644 --- a/kennel/events.py +++ b/kennel/events.py @@ -701,7 +701,6 @@ def reply_to_review( config: Config, repo_cfg: RepoConfig, gh: GitHub, - already_replied: set[int] | None = None, *, agent: ProviderAgent | None = None, prompts: Prompts | None = None, @@ -725,7 +724,7 @@ def reply_to_review( \"Submit review\") still arrives only through this event and is not yet handled. Tracked separately — out of scope for the dedup fix. """ - _ = (action, config, repo_cfg, gh, already_replied, agent, prompts) + _ = (action, config, repo_cfg, gh, agent, prompts) log.debug( "reply_to_review: skipping inline comments — handled per-comment (closes #518)" ) diff --git a/kennel/server.py b/kennel/server.py index 40c0961e..c2955145 100644 --- a/kennel/server.py +++ b/kennel/server.py @@ -53,7 +53,64 @@ log = logging.getLogger(__name__) -_replied_comments: set[int] = set() +class RepliedComments: + """Thread-safe set of already-replied comment IDs with atomic claim. + + Eliminates the TOCTOU window between the membership check and the add + that allowed duplicate webhook deliveries to both pass the dedup guard + and each independently call reply_to_comment() (closes #566). + + Usage pattern:: + + if not _replied_comments.claim(cid): + return # already handled by another delivery + # safe to proceed — this thread holds the exclusive claim + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._ids: set[int] = set() + + def claim(self, cid: int) -> bool: + """Atomically claim *cid*. + + Returns ``True`` if this call is the first to claim *cid* (newly + claimed — caller should proceed with the reply). Returns ``False`` + if *cid* was already present (another thread or a prior delivery + already claimed it — caller should skip the reply). + """ + with self._lock: + if cid in self._ids: + return False + self._ids.add(cid) + return True + + def add(self, cid: int) -> None: + """Non-atomic add — for test pre-seeding only.""" + with self._lock: + self._ids.add(cid) + + def release(self, cid: int) -> None: + """Release a previously claimed *cid* after a failed reply attempt. + + Removes *cid* so a subsequent webhook redelivery can claim it and + retry the reply. Call this in the failure path (exception handler) + after a claim succeeded but the reply call raised. + """ + with self._lock: + self._ids.discard(cid) + + def discard(self, cid: int) -> None: + """Remove *cid* if present — for test cleanup only.""" + with self._lock: + self._ids.discard(cid) + + def __contains__(self, cid: object) -> bool: + with self._lock: + return cid in self._ids + + +_replied_comments = RepliedComments() # Exponential backoff for git pull during self-restart: 10s, 30s, 60s # with a 10-minute total budget. Retries stop once the cumulative delay @@ -628,7 +685,7 @@ def _process_action_inner( if action.reply_to: promise = self._reply_promise(action) cid = action.reply_to.get("comment_id") - if cid and cid in _replied_comments: + if cid is not None and not _replied_comments.claim(cid): log.info("already replied to comment %s — skipping", cid) handled = True category, titles = None, [] @@ -640,17 +697,21 @@ def _process_action_inner( promise[0], promise[1], ) - category, titles = type(self)._fn_reply_to_comment( - action, self.config, repo_cfg, gh - ) + try: + category, titles = type(self)._fn_reply_to_comment( + action, self.config, repo_cfg, gh + ) + except Exception: + # Release the claim so a GitHub redelivery can retry. + if cid is not None: + _replied_comments.release(cid) + raise if promise is not None: reply_promises.remove_reply_promise( repo_cfg.work_dir / ".git" / "fido", promise[0], promise[1], ) - if cid: - _replied_comments.add(cid) handled = True # Create task based on triage result. # DEFER files a GitHub issue (handled in reply_to_comment) — no tasks.json entry. @@ -673,16 +734,14 @@ def _process_action_inner( if action.review_comments: activity.set_description("replying to review thread") - type(self)._fn_reply_to_review( - action, self.config, repo_cfg, gh, already_replied=_replied_comments - ) + type(self)._fn_reply_to_review(action, self.config, repo_cfg, gh) handled = True # inline comments handled individually # Top-level PR comments (issue_comment) — no reply_to, but has comment_body if not handled and action.comment_body: promise = self._reply_promise(action) cid = action.thread.get("comment_id") if action.thread else None - if cid and cid in _replied_comments: + if cid is not None and not _replied_comments.claim(cid): log.info("already replied to comment %s — skipping", cid) category, titles = None, [] else: @@ -693,17 +752,21 @@ def _process_action_inner( promise[0], promise[1], ) - category, titles = type(self)._fn_reply_to_issue_comment( - action, self.config, repo_cfg, gh - ) + try: + category, titles = type(self)._fn_reply_to_issue_comment( + action, self.config, repo_cfg, gh + ) + except Exception: + # Release the claim so a GitHub redelivery can retry. + if cid is not None: + _replied_comments.release(cid) + raise if promise is not None: reply_promises.remove_reply_promise( repo_cfg.work_dir / ".git" / "fido", promise[0], promise[1], ) - if cid: - _replied_comments.add(cid) handled = True # DEFER files a GitHub issue — no tasks.json entry. if category not in ("DUMP", "ANSWER", "ASK", "DEFER"): diff --git a/tests/test_server.py b/tests/test_server.py index ccf8e0e7..63e87845 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -21,7 +21,7 @@ from kennel.events import Action, recover_reply_promises from kennel.infra import Infra from kennel.provider import ProviderID -from kennel.server import PreflightError, WebhookHandler, _repo_status +from kennel.server import PreflightError, RepliedComments, WebhookHandler, _repo_status class RepoConfig(_RepoConfig): @@ -833,6 +833,69 @@ def test_raises_for_non_integer_comment_id(self) -> None: handler._reply_promise(action) +class TestRepliedComments: + """Unit tests for the RepliedComments atomic-claim helper.""" + + def test_claim_returns_true_first_time(self) -> None: + rc = RepliedComments() + assert rc.claim(1) is True + + def test_claim_returns_false_when_already_claimed(self) -> None: + rc = RepliedComments() + rc.claim(1) + assert rc.claim(1) is False + + def test_claim_is_atomic_across_threads(self) -> None: + """Exactly one thread should win the claim under concurrent pressure.""" + rc = RepliedComments() + results: list[bool] = [] + lock = threading.Lock() + + def try_claim() -> None: + result = rc.claim(42) + with lock: + results.append(result) + + threads = [threading.Thread(target=try_claim) for _ in range(20)] + for t in threads: + t.start() + for t in threads: + t.join() + assert results.count(True) == 1 + assert results.count(False) == 19 + + def test_add_prevents_subsequent_claim(self) -> None: + rc = RepliedComments() + rc.add(7) + assert rc.claim(7) is False + + def test_discard_allows_reclaim(self) -> None: + rc = RepliedComments() + assert rc.claim(3) is True + rc.discard(3) + assert rc.claim(3) is True + + def test_contains(self) -> None: + rc = RepliedComments() + assert 5 not in rc + rc.add(5) + assert 5 in rc + + def test_independent_ids_dont_interfere(self) -> None: + rc = RepliedComments() + assert rc.claim(10) is True + assert rc.claim(11) is True + assert rc.claim(10) is False + assert rc.claim(11) is False + + def test_release_allows_reclaim(self) -> None: + """release() clears a claimed ID so a redelivery can retry after failure.""" + rc = RepliedComments() + assert rc.claim(99) is True + rc.release(99) + assert rc.claim(99) is True + + class TestProcessAction: """Tests for _process_action — the background thread that dispatches actions.""" From ff80ac1493fba4bd1433c0e7b6d4fd30d96832cd Mon Sep 17 00:00:00 2001 From: Fido Can Code <190991155+FidoCanCode@users.noreply.github.com> Date: Fri, 17 Apr 2026 13:05:09 +0000 Subject: [PATCH 2/2] Filter handle_threads by webhook-claimed comment IDs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move RepliedComments to kennel/claimed.py as a shared module so both the webhook handler (server.py) and the worker (worker.py) can consult the same process-wide claimed set without a circular import. _filter_threads now skips threads whose first_db_id appears in the webhook-claimed set. The last_author == gh_user filter already handles threads where the reply landed before handle_threads fetches the thread list, but it has an inherent race with GitHub API propagation. The in-process claimed-ID check is deterministic and fires immediately when the webhook handler claims a comment — even when the reply is still in flight. Co-Authored-By: Claude Sonnet 4.6 --- kennel/claimed.py | 80 ++++++++++++++++++++++++++++++++++++++++++++ kennel/server.py | 61 +-------------------------------- kennel/worker.py | 16 +++++++-- tests/test_server.py | 3 +- tests/test_worker.py | 29 ++++++++++++++++ 5 files changed, 125 insertions(+), 64 deletions(-) create mode 100644 kennel/claimed.py diff --git a/kennel/claimed.py b/kennel/claimed.py new file mode 100644 index 00000000..e3e6a98a --- /dev/null +++ b/kennel/claimed.py @@ -0,0 +1,80 @@ +"""Process-wide set of webhook-claimed review comment IDs. + +The :class:`RepliedComments` class is a thread-safe owner of a set of +comment database IDs that the webhook handler has already claimed (or is +currently processing a reply for). A single instance lives here so that +both the webhook handler (``server.py``) and the worker (``worker.py``) +can consult it without a circular import. + +``server.py`` claims IDs before calling ``reply_to_comment()`` (atomic +check-and-add via :meth:`RepliedComments.claim`). ``worker.py`` uses the +same set in :meth:`~kennel.worker.Worker._filter_threads` to skip threads +whose first comment was already claimed, preventing the comments sub-agent +from posting a duplicate reply even when the webhook reply is still in +flight and not yet visible in the GitHub API. +""" + +from __future__ import annotations + +import threading + + +class RepliedComments: + """Thread-safe set of already-replied comment IDs with atomic claim. + + Eliminates the TOCTOU window between the membership check and the add + that allowed duplicate webhook deliveries to both pass the dedup guard + and each independently call reply_to_comment() (closes #566). + + Usage pattern:: + + if not _replied_comments.claim(cid): + return # already handled by another delivery + # safe to proceed — this thread holds the exclusive claim + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._ids: set[int] = set() + + def claim(self, cid: int) -> bool: + """Atomically claim *cid*. + + Returns ``True`` if this call is the first to claim *cid* (newly + claimed — caller should proceed with the reply). Returns ``False`` + if *cid* was already present (another thread or a prior delivery + already claimed it — caller should skip the reply). + """ + with self._lock: + if cid in self._ids: + return False + self._ids.add(cid) + return True + + def add(self, cid: int) -> None: + """Non-atomic add — for test pre-seeding only.""" + with self._lock: + self._ids.add(cid) + + def release(self, cid: int) -> None: + """Release a previously claimed *cid* after a failed reply attempt. + + Removes *cid* so a subsequent webhook redelivery can claim it and + retry the reply. Call this in the failure path (exception handler) + after a claim succeeded but the reply call raised. + """ + with self._lock: + self._ids.discard(cid) + + def discard(self, cid: int) -> None: + """Remove *cid* if present — for test cleanup only.""" + with self._lock: + self._ids.discard(cid) + + def __contains__(self, cid: object) -> bool: + with self._lock: + return cid in self._ids + + +#: Process-wide singleton shared by the webhook handler and the worker. +replied_comments = RepliedComments() diff --git a/kennel/server.py b/kennel/server.py index c2955145..ed7f327a 100644 --- a/kennel/server.py +++ b/kennel/server.py @@ -19,6 +19,7 @@ from xml.etree.ElementTree import Element, SubElement, register_namespace, tostring from kennel import provider, reply_promises +from kennel.claimed import replied_comments as _replied_comments from kennel.claude import kill_active_children from kennel.config import Config, RepoConfig, RepoMembership from kennel.events import ( @@ -52,66 +53,6 @@ log = logging.getLogger(__name__) - -class RepliedComments: - """Thread-safe set of already-replied comment IDs with atomic claim. - - Eliminates the TOCTOU window between the membership check and the add - that allowed duplicate webhook deliveries to both pass the dedup guard - and each independently call reply_to_comment() (closes #566). - - Usage pattern:: - - if not _replied_comments.claim(cid): - return # already handled by another delivery - # safe to proceed — this thread holds the exclusive claim - """ - - def __init__(self) -> None: - self._lock = threading.Lock() - self._ids: set[int] = set() - - def claim(self, cid: int) -> bool: - """Atomically claim *cid*. - - Returns ``True`` if this call is the first to claim *cid* (newly - claimed — caller should proceed with the reply). Returns ``False`` - if *cid* was already present (another thread or a prior delivery - already claimed it — caller should skip the reply). - """ - with self._lock: - if cid in self._ids: - return False - self._ids.add(cid) - return True - - def add(self, cid: int) -> None: - """Non-atomic add — for test pre-seeding only.""" - with self._lock: - self._ids.add(cid) - - def release(self, cid: int) -> None: - """Release a previously claimed *cid* after a failed reply attempt. - - Removes *cid* so a subsequent webhook redelivery can claim it and - retry the reply. Call this in the failure path (exception handler) - after a claim succeeded but the reply call raised. - """ - with self._lock: - self._ids.discard(cid) - - def discard(self, cid: int) -> None: - """Remove *cid* if present — for test cleanup only.""" - with self._lock: - self._ids.discard(cid) - - def __contains__(self, cid: object) -> bool: - with self._lock: - return cid in self._ids - - -_replied_comments = RepliedComments() - # Exponential backoff for git pull during self-restart: 10s, 30s, 60s # with a 10-minute total budget. Retries stop once the cumulative delay # exceeds _PULL_BUDGET_SECONDS, even if a retry window remains. diff --git a/kennel/worker.py b/kennel/worker.py index 019db95d..2163dee1 100644 --- a/kennel/worker.py +++ b/kennel/worker.py @@ -17,6 +17,7 @@ from typing import IO, Any, Protocol from kennel import hooks, tasks +from kennel.claimed import replied_comments as _webhook_claimed from kennel.claude import ClaudeCode from kennel.config import Config, RepoConfig, RepoMembership from kennel.github import GitHub @@ -1487,8 +1488,14 @@ def _filter_threads( A thread is included when: - it is not resolved, - it has at least one comment, - - the last commenter is not *gh_user* (awaiting a response), and - - the last commenter is either in *collaborators* or ends with ``[bot]``. + - the last commenter is not *gh_user* (awaiting a response), + - the last commenter is either in *collaborators* or ends with ``[bot]``, and + - the first comment's ID has not been claimed by the webhook handler. + + The last rule prevents the comments sub-agent from posting a duplicate + reply to a thread that the webhook handler already claimed — even if + the reply is still in-flight and not yet visible in the GitHub API + (which ``last_author == gh_user`` alone cannot catch). """ result = [] for node in nodes: @@ -1504,13 +1511,16 @@ def _filter_threads( continue if last_author not in collaborators and not last_author.endswith("[bot]"): continue + first_db_id = first_comment.get("databaseId") + if first_db_id is not None and first_db_id in _webhook_claimed: + continue first_login = (first_comment.get("author") or {}).get("login", "") result.append( { "id": node["id"], "is_bot": first_login.endswith("[bot]"), "first_author": first_login, - "first_db_id": first_comment.get("databaseId"), + "first_db_id": first_db_id, "first_body": first_comment["body"], "last_author": last_author, "last_body": last_comment["body"], diff --git a/tests/test_server.py b/tests/test_server.py index 63e87845..49fa0e1a 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -16,12 +16,13 @@ import pytest from kennel import provider +from kennel.claimed import RepliedComments from kennel.config import Config from kennel.config import RepoConfig as _RepoConfig from kennel.events import Action, recover_reply_promises from kennel.infra import Infra from kennel.provider import ProviderID -from kennel.server import PreflightError, RepliedComments, WebhookHandler, _repo_status +from kennel.server import PreflightError, WebhookHandler, _repo_status class RepoConfig(_RepoConfig): diff --git a/tests/test_worker.py b/tests/test_worker.py index 86e0e745..c567779e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -5441,6 +5441,35 @@ def test_total_counts_all_comments(self, tmp_path: Path) -> None: ) assert result[0]["total"] == 3 + def test_excludes_webhook_claimed_thread(self, tmp_path: Path) -> None: + """Threads whose first comment ID was claimed by the webhook handler are skipped. + + This prevents the comments sub-agent from posting a duplicate reply even + when the webhook's reply is still in flight and not yet visible via the + GitHub API (which the last_author == gh_user guard cannot catch). + """ + import kennel.claimed as kc + + w = self._make_worker(tmp_path) + node = self._make_node(first_db_id=700) + kc.replied_comments.add(700) + try: + result = w._filter_threads([node], "fido-bot", frozenset({"owner"})) + assert result == [] + finally: + kc.replied_comments.discard(700) + + def test_includes_unclaimed_thread(self, tmp_path: Path) -> None: + """Threads whose first comment ID is not in the webhook-claimed set are included.""" + import kennel.claimed as kc + + w = self._make_worker(tmp_path) + kc.replied_comments.discard(800) # ensure clean state + result = w._filter_threads( + [self._make_node(first_db_id=800)], "fido-bot", frozenset({"owner"}) + ) + assert len(result) == 1 + class TestResolveAddressedThreads: """Tests for Worker.resolve_addressed_threads."""