Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ GitHub Releases page; `0.8.0` is the new starting line.

## Unreleased

- **StrReplaceFile now refuses ambiguous single replacements.** A non-`replace_all` edit now
errors when `old` matches more than once instead of silently editing the first match. Add
surrounding context to make the old string unique, or pass `replace_all=true` when every
occurrence should change.

## 0.26.0 (2026-05-30)

### What changed in this release
Expand Down
16 changes: 10 additions & 6 deletions src/pythinker_code/background/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,19 @@ def create_bash_task(
self._store.write_runtime(task_id, runtime)
raise

runtime = self._store.read_runtime(task_id)
if runtime.finished_at is None and (
runtime.status == "created"
or (runtime.status == "starting" and runtime.worker_pid is None)
):
def mark_worker_started(runtime: TaskRuntime) -> bool:
if runtime.finished_at is not None:
return False
if runtime.status != "created" and not (
runtime.status == "starting" and runtime.worker_pid is None
):
return False
runtime.status = "starting"
runtime.worker_pid = worker_pid
runtime.updated_at = time.time()
self._store.write_runtime(task_id, runtime)
return True

self._store.update_runtime(task_id, mark_worker_started)
view = self._store.merged_view(task_id)
self._journal_task_milestone("background task started", view)
return view
Expand Down
13 changes: 13 additions & 0 deletions src/pythinker_code/background/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import shutil
import time
from collections.abc import Callable
from contextlib import contextmanager
from pathlib import Path

Expand Down Expand Up @@ -139,6 +140,18 @@ def write_runtime(self, task_id: str, runtime: TaskRuntime) -> None:
with self._runtime_lock(task_id):
self._write_runtime_unlocked(task_id, runtime)

def update_runtime(self, task_id: str, update: Callable[[TaskRuntime], bool]) -> TaskRuntime:
"""Apply a read-modify-write update under the runtime lock.

``update`` mutates the current runtime and returns true when it should
be written back. The current runtime is returned either way.
"""
with self._runtime_lock(task_id):
runtime = self.read_runtime(task_id)
if update(runtime):
self._write_runtime_unlocked(task_id, runtime)
return runtime

def _write_runtime_unlocked(self, task_id: str, runtime: TaskRuntime) -> None:
"""Write runtime without acquiring the per-task lock (caller holds it)."""
path = self.runtime_path(task_id)
Expand Down
70 changes: 32 additions & 38 deletions src/pythinker_code/background/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pythinker_code.utils.logging import logger
from pythinker_code.utils.subprocess_env import get_clean_env

from .models import TaskControl
from .models import TaskControl, TaskRuntime
from .store import BackgroundTaskStore


Expand Down Expand Up @@ -112,9 +112,9 @@ async def _terminate_process(force: bool = False) -> None:
async def _check_output_limit() -> None:
"""Terminate the task if its output.log grew past ``max_output_bytes``.

Writes a single marker line and records a failure the first time the
limit is hit; the ``output_limit_exceeded`` guard keeps it from
re-marking on subsequent polls or once the process is already exiting.
Writes a single marker line and asks the process to terminate the first
time the limit is hit; the final runtime write records the failure once
the process has exited.
"""
nonlocal output_limit_exceeded, output_limit_reason
if max_output_bytes <= 0 or output_limit_exceeded:
Expand All @@ -133,14 +133,6 @@ async def _check_output_limit() -> None:
marker = f"\n... output limit exceeded ({size} bytes); task terminated ...\n"
with contextlib.suppress(OSError), output_path.open("ab") as marker_file:
marker_file.write(marker.encode("utf-8"))
with store._runtime_lock(task_id): # pyright: ignore[reportPrivateUsage]
current = store.read_runtime(task_id)
if not current.finished_at:
current.status = "failed"
current.interrupted = True
current.failure_reason = output_limit_reason
current.updated_at = time.time()
store._write_runtime_unlocked(task_id, current) # pyright: ignore[reportPrivateUsage]
await _terminate_process(force=False)

async def _control_loop() -> None:
Expand Down Expand Up @@ -211,7 +203,6 @@ async def _input_loop() -> None:
runtime.updated_at = time.time()
runtime.heartbeat_at = runtime.updated_at
store.write_runtime(task_id, runtime)
last_known_runtime = runtime

heartbeat_task = asyncio.create_task(_heartbeat_loop())
control_task = asyncio.create_task(_control_loop())
Expand Down Expand Up @@ -254,29 +245,32 @@ async def _input_loop() -> None:
with contextlib.suppress(asyncio.CancelledError):
await task

runtime = last_known_runtime.model_copy()
control = store.read_control(task_id)
runtime.finished_at = time.time()
runtime.updated_at = runtime.finished_at
runtime.exit_code = returncode
runtime.heartbeat_at = runtime.finished_at
if output_limit_exceeded:
runtime.status = "failed"
runtime.interrupted = True
runtime.failure_reason = output_limit_reason
elif timed_out:
runtime.status = "failed"
runtime.interrupted = True
runtime.timed_out = True
runtime.failure_reason = timeout_reason
elif control.kill_requested_at is not None:
runtime.status = "killed"
runtime.interrupted = True
runtime.failure_reason = control.kill_reason or "Killed"
elif returncode == 0:
runtime.status = "completed"
runtime.failure_reason = None
else:
runtime.status = "failed"
runtime.failure_reason = f"Command failed with exit code {returncode}"
store.write_runtime(task_id, runtime)

def finish_runtime(runtime: TaskRuntime) -> bool:
runtime.finished_at = time.time()
runtime.updated_at = runtime.finished_at
runtime.exit_code = returncode
runtime.heartbeat_at = runtime.finished_at
if output_limit_exceeded:
runtime.status = "failed"
runtime.interrupted = True
runtime.failure_reason = output_limit_reason
elif timed_out:
runtime.status = "failed"
runtime.interrupted = True
runtime.timed_out = True
runtime.failure_reason = timeout_reason
elif control.kill_requested_at is not None:
runtime.status = "killed"
runtime.interrupted = True
runtime.failure_reason = control.kill_reason or "Killed"
elif returncode == 0:
runtime.status = "completed"
runtime.failure_reason = None
else:
runtime.status = "failed"
runtime.failure_reason = f"Command failed with exit code {returncode}"
return True

store.update_runtime(task_id, finish_runtime)
23 changes: 17 additions & 6 deletions src/pythinker_code/memory/consolidation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def _safe_id(value: str) -> str:
return re.sub(r"[^a-z0-9_-]", "-", value.lower())[:32].strip("-") or "candidate"


def _memory_entry_hash(content: str) -> str:
return content_hash(tier="memory", title=content[:60], body=content)


async def inbox_dir(store: ProjectMemoryStore) -> Path:
root = await store._ensure_dir() # pyright: ignore[reportPrivateUsage]
path = root / "memory" / "inbox"
Expand All @@ -53,16 +57,16 @@ async def generate_inbox_candidates(
) -> list[InboxCandidate]:
"""Stage scratch/journal candidates for approval-gated durable memory consolidation."""
existing_entries = [*await store.read_entries("memory"), *await store.read_entries("user")]
existing_hashes = {
content_hash(tier="memory", title=entry[:60], body=entry) for entry in existing_entries
}
staged = {candidate.content_hash for candidate in await list_inbox_candidates(store)}
existing_hashes = {_memory_entry_hash(entry) for entry in existing_entries}
inbox_candidates = await list_inbox_candidates(store)
staged = {candidate.content_hash for candidate in inbox_candidates}
staged.update(_memory_entry_hash(candidate.content) for candidate in inbox_candidates)
directory = await inbox_dir(store)
candidates: list[InboxCandidate] = []
for block in await gather_candidates(store, work_dir):
if block.tier in {"memory", "user"}:
continue
digest = content_hash(tier="memory", title=block.title, body=block.content)
digest = _memory_entry_hash(block.content)
if digest in existing_hashes or digest in staged:
continue
candidate = InboxCandidate(
Expand All @@ -74,7 +78,14 @@ async def generate_inbox_candidates(
content_hash=digest,
)
path = directory / f"{candidate.id}.json"
fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
try:
fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
except FileExistsError:
# A file with this id already exists on disk but was absent from
# ``staged`` (e.g. it is corrupt and was skipped during listing).
# Treat it as already staged instead of crashing the whole harvest.
staged.add(digest)
continue
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(asdict(candidate), fh, ensure_ascii=False, indent=2)
candidates.append(candidate)
Expand Down
12 changes: 6 additions & 6 deletions src/pythinker_code/memory/retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import math
import re
import time
import unicodedata
from abc import ABC, abstractmethod
from dataclasses import dataclass, replace

_TOKEN_RE = re.compile(r"[a-z0-9]{2,}")
# Unicode word runs, with underscores treated as separators for snake_case.
_TOKEN_RE = re.compile(r"[^\W_]{2,}", re.UNICODE)
_RECENCY_HALF_LIFE_DAYS = 14.0
_BM25_K1 = 1.5
_BM25_B = 0.75
_LABEL_BOOST = 0.5
_PATH_BOOST = 0.5


Expand All @@ -19,7 +20,8 @@ def estimate_tokens(text: str) -> int:


def _tokenize(text: str) -> list[str]:
return _TOKEN_RE.findall(text.lower())
normalized = unicodedata.normalize("NFKC", text).casefold()
return _TOKEN_RE.findall(normalized)


@dataclass(frozen=True, slots=True)
Expand Down Expand Up @@ -67,7 +69,7 @@ async def retrieve(self, query: RecallQuery, budget_tokens: int) -> list[RankedB
for term in set(doc):
df[term] = df.get(term, 0) + 1

q_terms = _tokenize(query.text)
q_terms = _tokenize(" ".join((query.text, *query.labels)))
scored: list[RankedBlock] = []
for cand, doc in zip(self._candidates, docs, strict=True):
dl = len(doc) or 1
Expand All @@ -91,8 +93,6 @@ async def retrieve(self, query: RecallQuery, budget_tokens: int) -> list[RankedB
boost = 0.0
if any(path in cand.files for path in query.paths):
boost += _PATH_BOOST
if set(query.labels) & set(cand.labels):
boost += _LABEL_BOOST
if not q_terms and not query.paths and not query.labels:
boost += 0.01
scored.append(replace(cand, score=bm25 * decay + boost))
Expand Down
Loading
Loading