Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions kennel/claimed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Process-wide set of webhook-claimed review comment IDs.

The :class:`RepliedComments` class is a thread-safe owner of a set of
comment database IDs that the webhook handler has already claimed (or is
currently processing a reply for). A single instance lives here so that
both the webhook handler (``server.py``) and the worker (``worker.py``)
can consult it without a circular import.

``server.py`` claims IDs before calling ``reply_to_comment()`` (atomic
check-and-add via :meth:`RepliedComments.claim`). ``worker.py`` uses the
same set in :meth:`~kennel.worker.Worker._filter_threads` to skip threads
whose first comment was already claimed, preventing the comments sub-agent
from posting a duplicate reply even when the webhook reply is still in
flight and not yet visible in the GitHub API.
"""

from __future__ import annotations

import threading


class RepliedComments:
"""Thread-safe set of already-replied comment IDs with atomic claim.

Eliminates the TOCTOU window between the membership check and the add
that allowed duplicate webhook deliveries to both pass the dedup guard
and each independently call reply_to_comment() (closes #566).

Usage pattern::

if not _replied_comments.claim(cid):
return # already handled by another delivery
# safe to proceed — this thread holds the exclusive claim
"""

def __init__(self) -> None:
self._lock = threading.Lock()
self._ids: set[int] = set()

def claim(self, cid: int) -> bool:
"""Atomically claim *cid*.

Returns ``True`` if this call is the first to claim *cid* (newly
claimed — caller should proceed with the reply). Returns ``False``
if *cid* was already present (another thread or a prior delivery
already claimed it — caller should skip the reply).
"""
with self._lock:
if cid in self._ids:
return False
self._ids.add(cid)
return True

def add(self, cid: int) -> None:
"""Non-atomic add — for test pre-seeding only."""
with self._lock:
self._ids.add(cid)

def release(self, cid: int) -> None:
"""Release a previously claimed *cid* after a failed reply attempt.

Removes *cid* so a subsequent webhook redelivery can claim it and
retry the reply. Call this in the failure path (exception handler)
after a claim succeeded but the reply call raised.
"""
with self._lock:
self._ids.discard(cid)

def discard(self, cid: int) -> None:
"""Remove *cid* if present — for test cleanup only."""
with self._lock:
self._ids.discard(cid)

def __contains__(self, cid: object) -> bool:
with self._lock:
return cid in self._ids


#: Process-wide singleton shared by the webhook handler and the worker.
replied_comments = RepliedComments()
3 changes: 1 addition & 2 deletions kennel/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,6 @@ def reply_to_review(
config: Config,
repo_cfg: RepoConfig,
gh: GitHub,
already_replied: set[int] | None = None,
*,
agent: ProviderAgent | None = None,
prompts: Prompts | None = None,
Expand All @@ -725,7 +724,7 @@ def reply_to_review(
\"Submit review\") still arrives only through this event and is not
yet handled. Tracked separately — out of scope for the dedup fix.
"""
_ = (action, config, repo_cfg, gh, already_replied, agent, prompts)
_ = (action, config, repo_cfg, gh, agent, prompts)
log.debug(
"reply_to_review: skipping inline comments — handled per-comment (closes #518)"
)
Expand Down
40 changes: 22 additions & 18 deletions kennel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from xml.etree.ElementTree import Element, SubElement, register_namespace, tostring

from kennel import provider, reply_promises
from kennel.claimed import replied_comments as _replied_comments
from kennel.claude import kill_active_children
from kennel.config import Config, RepoConfig, RepoMembership
from kennel.events import (
Expand Down Expand Up @@ -52,9 +53,6 @@

log = logging.getLogger(__name__)


_replied_comments: set[int] = set()

# Exponential backoff for git pull during self-restart: 10s, 30s, 60s
# with a 10-minute total budget. Retries stop once the cumulative delay
# exceeds _PULL_BUDGET_SECONDS, even if a retry window remains.
Expand Down Expand Up @@ -628,7 +626,7 @@ def _process_action_inner(
if action.reply_to:
promise = self._reply_promise(action)
cid = action.reply_to.get("comment_id")
if cid and cid in _replied_comments:
if cid is not None and not _replied_comments.claim(cid):
log.info("already replied to comment %s — skipping", cid)
handled = True
category, titles = None, []
Expand All @@ -640,17 +638,21 @@ def _process_action_inner(
promise[0],
promise[1],
)
category, titles = type(self)._fn_reply_to_comment(
action, self.config, repo_cfg, gh
)
try:
category, titles = type(self)._fn_reply_to_comment(
action, self.config, repo_cfg, gh
)
except Exception:
# Release the claim so a GitHub redelivery can retry.
if cid is not None:
_replied_comments.release(cid)
raise
if promise is not None:
reply_promises.remove_reply_promise(
repo_cfg.work_dir / ".git" / "fido",
promise[0],
promise[1],
)
if cid:
_replied_comments.add(cid)
handled = True
# Create task based on triage result.
# DEFER files a GitHub issue (handled in reply_to_comment) — no tasks.json entry.
Expand All @@ -673,16 +675,14 @@ def _process_action_inner(

if action.review_comments:
activity.set_description("replying to review thread")
type(self)._fn_reply_to_review(
action, self.config, repo_cfg, gh, already_replied=_replied_comments
)
type(self)._fn_reply_to_review(action, self.config, repo_cfg, gh)
handled = True # inline comments handled individually

# Top-level PR comments (issue_comment) — no reply_to, but has comment_body
if not handled and action.comment_body:
promise = self._reply_promise(action)
cid = action.thread.get("comment_id") if action.thread else None
if cid and cid in _replied_comments:
if cid is not None and not _replied_comments.claim(cid):
log.info("already replied to comment %s — skipping", cid)
category, titles = None, []
else:
Expand All @@ -693,17 +693,21 @@ def _process_action_inner(
promise[0],
promise[1],
)
category, titles = type(self)._fn_reply_to_issue_comment(
action, self.config, repo_cfg, gh
)
try:
category, titles = type(self)._fn_reply_to_issue_comment(
action, self.config, repo_cfg, gh
)
except Exception:
# Release the claim so a GitHub redelivery can retry.
if cid is not None:
_replied_comments.release(cid)
raise
if promise is not None:
reply_promises.remove_reply_promise(
repo_cfg.work_dir / ".git" / "fido",
promise[0],
promise[1],
)
if cid:
_replied_comments.add(cid)
handled = True
# DEFER files a GitHub issue — no tasks.json entry.
if category not in ("DUMP", "ANSWER", "ASK", "DEFER"):
Expand Down
16 changes: 13 additions & 3 deletions kennel/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import IO, Any, Protocol

from kennel import hooks, tasks
from kennel.claimed import replied_comments as _webhook_claimed
from kennel.claude import ClaudeCode
from kennel.config import Config, RepoConfig, RepoMembership
from kennel.github import GitHub
Expand Down Expand Up @@ -1487,8 +1488,14 @@ def _filter_threads(
A thread is included when:
- it is not resolved,
- it has at least one comment,
- the last commenter is not *gh_user* (awaiting a response), and
- the last commenter is either in *collaborators* or ends with ``[bot]``.
- the last commenter is not *gh_user* (awaiting a response),
- the last commenter is either in *collaborators* or ends with ``[bot]``, and
- the first comment's ID has not been claimed by the webhook handler.

The last rule prevents the comments sub-agent from posting a duplicate
reply to a thread that the webhook handler already claimed — even if
the reply is still in-flight and not yet visible in the GitHub API
(which ``last_author == gh_user`` alone cannot catch).
"""
result = []
for node in nodes:
Expand All @@ -1504,13 +1511,16 @@ def _filter_threads(
continue
if last_author not in collaborators and not last_author.endswith("[bot]"):
continue
first_db_id = first_comment.get("databaseId")
if first_db_id is not None and first_db_id in _webhook_claimed:
continue
first_login = (first_comment.get("author") or {}).get("login", "")
result.append(
{
"id": node["id"],
"is_bot": first_login.endswith("[bot]"),
"first_author": first_login,
"first_db_id": first_comment.get("databaseId"),
"first_db_id": first_db_id,
"first_body": first_comment["body"],
"last_author": last_author,
"last_body": last_comment["body"],
Expand Down
64 changes: 64 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pytest

from kennel import provider
from kennel.claimed import RepliedComments
from kennel.config import Config
from kennel.config import RepoConfig as _RepoConfig
from kennel.events import Action, recover_reply_promises
Expand Down Expand Up @@ -833,6 +834,69 @@ def test_raises_for_non_integer_comment_id(self) -> None:
handler._reply_promise(action)


class TestRepliedComments:
"""Unit tests for the RepliedComments atomic-claim helper."""

def test_claim_returns_true_first_time(self) -> None:
rc = RepliedComments()
assert rc.claim(1) is True

def test_claim_returns_false_when_already_claimed(self) -> None:
rc = RepliedComments()
rc.claim(1)
assert rc.claim(1) is False

def test_claim_is_atomic_across_threads(self) -> None:
"""Exactly one thread should win the claim under concurrent pressure."""
rc = RepliedComments()
results: list[bool] = []
lock = threading.Lock()

def try_claim() -> None:
result = rc.claim(42)
with lock:
results.append(result)

threads = [threading.Thread(target=try_claim) for _ in range(20)]
for t in threads:
t.start()
for t in threads:
t.join()
assert results.count(True) == 1
assert results.count(False) == 19

def test_add_prevents_subsequent_claim(self) -> None:
rc = RepliedComments()
rc.add(7)
assert rc.claim(7) is False

def test_discard_allows_reclaim(self) -> None:
rc = RepliedComments()
assert rc.claim(3) is True
rc.discard(3)
assert rc.claim(3) is True

def test_contains(self) -> None:
rc = RepliedComments()
assert 5 not in rc
rc.add(5)
assert 5 in rc

def test_independent_ids_dont_interfere(self) -> None:
rc = RepliedComments()
assert rc.claim(10) is True
assert rc.claim(11) is True
assert rc.claim(10) is False
assert rc.claim(11) is False

def test_release_allows_reclaim(self) -> None:
"""release() clears a claimed ID so a redelivery can retry after failure."""
rc = RepliedComments()
assert rc.claim(99) is True
rc.release(99)
assert rc.claim(99) is True


class TestProcessAction:
"""Tests for _process_action — the background thread that dispatches actions."""

Expand Down
29 changes: 29 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5441,6 +5441,35 @@ def test_total_counts_all_comments(self, tmp_path: Path) -> None:
)
assert result[0]["total"] == 3

def test_excludes_webhook_claimed_thread(self, tmp_path: Path) -> None:
"""Threads whose first comment ID was claimed by the webhook handler are skipped.

This prevents the comments sub-agent from posting a duplicate reply even
when the webhook's reply is still in flight and not yet visible via the
GitHub API (which the last_author == gh_user guard cannot catch).
"""
import kennel.claimed as kc

w = self._make_worker(tmp_path)
node = self._make_node(first_db_id=700)
kc.replied_comments.add(700)
try:
result = w._filter_threads([node], "fido-bot", frozenset({"owner"}))
assert result == []
finally:
kc.replied_comments.discard(700)

def test_includes_unclaimed_thread(self, tmp_path: Path) -> None:
"""Threads whose first comment ID is not in the webhook-claimed set are included."""
import kennel.claimed as kc

w = self._make_worker(tmp_path)
kc.replied_comments.discard(800) # ensure clean state
result = w._filter_threads(
[self._make_node(first_db_id=800)], "fido-bot", frozenset({"owner"})
)
assert len(result) == 1


class TestResolveAddressedThreads:
"""Tests for Worker.resolve_addressed_threads."""
Expand Down
Loading