Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 247 additions & 4 deletions src/command_system/input_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,206 @@ def _extract_urls(text: str) -> list[str]:
# @-mention pipeline and the Read tool agree on what counts as an image.
_AT_MENTION_IMAGE_EXTENSIONS = frozenset({"png", "jpg", "jpeg", "gif", "webp"})

# Extensions known to be binary. Opening one in text mode with
# ``errors="replace"`` would emit a wall of utf-8 replacement chars that
# the model latches onto and hallucinates from (the exact failure mode the
# image branch was added to prevent). We skip the read and emit a small
# system-reminder pointing the model at the Read tool instead. PDFs are
# the user-visible case (Read tool supports them via ``pages``); the rest
# are defense in depth so a stray @archive.zip doesn't pollute the prompt
# either.
_AT_MENTION_BINARY_EXTENSIONS = frozenset({
"pdf",
"zip", "tar", "gz", "tgz", "bz2", "xz", "7z", "rar",
"exe", "dll", "so", "dylib", "a", "o",
"bin", "iso", "dmg",
"docx", "xlsx", "pptx", "doc", "xls", "ppt",
"mp3", "wav", "flac", "ogg", "m4a",
"mp4", "mov", "avi", "mkv", "webm",
"ttf", "otf", "woff", "woff2",
"class", "jar", "war",
"pyc", "pyo",
"sqlite", "db",
})

# Heuristic chunk used by ``_looks_like_binary``: enough bytes to spot a
# NUL byte in plausible binary headers without paying the full read cost
# for huge files.
_BINARY_SNIFF_BYTES = 8192


# BOM signatures we know how to decode as text. Order matters: UTF-32
# BOMs must be checked BEFORE the matching UTF-16 BOMs (UTF-32-LE BOM
# `\xff\xfe\x00\x00` starts with the UTF-16-LE BOM `\xff\xfe`).
#
# Codec choice: use the BOM-aware codecs (``utf-16``, ``utf-32``) rather
# than the explicit-byte-order forms (``utf-16-le``, ``utf-16-be``). The
# latter PRESERVE the BOM as a literal ```` codepoint at the start
# of the string; the BOM-aware codecs consume it. We picked the encoding
# by reading the BOM, so feeding the file to a codec that re-reads it is
# both correct and strips the otherwise-leaking zero-width-no-break-space.
_BOM_ENCODINGS: tuple[tuple[bytes, str], ...] = (
(b"\xff\xfe\x00\x00", "utf-32"),
(b"\x00\x00\xfe\xff", "utf-32"),
(b"\xff\xfe", "utf-16"),
(b"\xfe\xff", "utf-16"),
(b"\xef\xbb\xbf", "utf-8-sig"),
)


def _detect_bom_encoding(head: bytes) -> str | None:
"""Return the Python codec name implied by a BOM prefix, or None."""
for sig, enc in _BOM_ENCODINGS:
if head.startswith(sig):
return enc
return None


def _looks_like_binary(path: str) -> bool:
"""Return True if the first ``_BINARY_SNIFF_BYTES`` bytes contain a NUL
AND the file is not BOM-prefixed text.

Defense-in-depth check for files whose extension isn't in
``_AT_MENTION_BINARY_EXTENSIONS`` — e.g. a misnamed ``.txt`` that's
actually a tarball, or any extension we haven't enumerated. NUL bytes
do not appear in well-formed utf-8 text, so their presence in a sniff
window is a high-precision signal.

Exception: UTF-16 / UTF-32 text files contain NUL bytes for every
ASCII char and would trip the naive NUL sniffer. A BOM prefix
(``\\xff\\xfe`` LE / ``\\xfe\\xff`` BE / UTF-32 variants) tells us
the file is real text that just needs a different decoder; we return
False so the text-read branch can decode it properly via
``_read_text_with_encoding``. Without that paired fix this BOM
short-circuit would let UTF-16 mojibake flow into a system-reminder
(the exact failure mode the binary branch was added to prevent).

Conservative on read errors (treats unreadable files as "not binary"
so the existing text-read path can surface the OSError its caller
already handles).
"""
try:
with open(path, "rb") as fh:
chunk = fh.read(_BINARY_SNIFF_BYTES)
except OSError:
return False
if _detect_bom_encoding(chunk) is not None:
# Encoded text — handled by the text branch with the right codec.
return False
return b"\x00" in chunk


# If a decoded file has more than this fraction of U+FFFD replacement
# chars, treat it as garbage rather than text. A real UTF-16 / UTF-8
# text file produces well under 1% replacements even with `errors=replace`;
# a binary blob with a fake BOM that doesn't pair to valid UTF-16
# codepoints produces 10-50%+. Threshold picked to be generously high
# so a UTF-16 file with a few corrupt bytes still ships its text, but
# an adversarial binary blob with a spoofed BOM doesn't leak mojibake
# into the prompt.
_MAX_REPLACEMENT_CHAR_FRACTION = 0.03


def _decoded_text_looks_garbled(decoded: str) -> bool:
"""Return True if the decoded string looks like binary garbage rather
than legitimate text. Two heuristics:

1. **NUL char presence (zero-tolerance).** A real text file has zero
U+0000 characters. If we decoded any, the source bytes contained
NULs that the codec mapped to U+0000 — i.e. binary garbage that
happened to align with a valid UTF-16 NUL pair. Letting these
through would re-introduce Bug A's NUL-in-system-reminder
failure: when the str round-trips through utf-8 encoding for the
API, those U+0000 chars become literal NUL bytes in the prompt.

2. **U+FFFD fraction (high-confidence garbage signal).** Above
``_MAX_REPLACEMENT_CHAR_FRACTION`` the file isn't the encoding
the BOM claimed.

Residual cases the heuristic does NOT catch: a binary blob with a
spoofed BOM whose every 2-byte pair decodes to a valid non-NUL
Unicode codepoint (e.g. all bytes in a printable plane). These
appear as walls of CJK or symbol characters — not Bug A's failure
family (no NULs, no replacement chars), and the model lacks
recognizable ASCII fragments to hallucinate from. Accepting this
as residual risk; a more aggressive detector would false-positive
on legitimate non-Latin-script text files.
"""
if not decoded:
return False
if "\x00" in decoded:
return True
replacement_count = decoded.count("�")
if replacement_count == 0:
return False
return (replacement_count / len(decoded)) > _MAX_REPLACEMENT_CHAR_FRACTION


def _read_text_with_encoding(path: str) -> str | None:
"""Read ``path`` as text, picking a decoder from any leading BOM.

Falls back to utf-8 with ``errors="replace"`` for the common case
(no BOM). When a BOM is present we use the codec it implies (with
``errors="replace"`` so a malformed trailing byte doesn't bring
down the whole read).

Post-decode garbage check: if the decoded text is mostly U+FFFD
replacement characters, returns ``None`` so the caller drops the
attachment instead of inlining mojibake. This closes the adversarial
case where a binary blob starts with `\\xff\\xfe` (fake UTF-16 BOM)
and an extension we haven't enumerated — without the check, the
BOM short-circuit would route the blob to the text branch and the
`errors="replace"` decode would land replacement-char mojibake in
the prompt (same failure family as Bug A even though no NULs leak).

Returns ``None`` on OSError or on garbled-decode so the caller
matches the existing "drop this attachment silently" behaviour for
unreadable files. Callers should still emit a binary-style reminder
via the @-mention `binary` attachment kind when this returns None
for a non-OSError reason — see ``expand_at_mentions``.
"""
try:
with open(path, "rb") as fh:
head = fh.read(4)
except OSError:
return None
encoding = _detect_bom_encoding(head) or "utf-8"
try:
with open(path, "r", encoding=encoding, errors="replace") as fh:
data = fh.read()
except OSError:
return None
except (LookupError, UnicodeError):
# Pathological codec name from a future BOM check, or a UTF-32
# decode failure with errors=replace (rare but theoretically
# possible). Fall back to a raw utf-8 replacement read so the
# path still produces *some* text rather than a silent drop.
try:
with open(path, "r", encoding="utf-8", errors="replace") as fh:
data = fh.read()
except OSError:
return None
if _decoded_text_looks_garbled(data):
return None
return data


def _binary_hint_for_ext(ext: str) -> str:
"""Return a per-extension hint the model can act on, or a generic
fallback. PDF specifically points at the Read tool's ``pages``
parameter, mirroring TS's behaviour where PDFs are surfaced through
Read rather than auto-inlined via @-mention."""
if ext == "pdf":
return (
"PDFs cannot be inlined as @-mention text. Use the Read tool "
"with the ``pages`` parameter (e.g. ``pages=\"1-5\"``) to view "
"specific page ranges."
)
return (
"This file is binary and cannot be inlined as text. Use the Read "
"tool to view it if it is a supported format."
)


def _try_build_image_attachment(
expanded: str,
Expand Down Expand Up @@ -359,10 +559,43 @@ def expand_at_mentions(
# contains the path, so the model can call ``Read``
# explicitly to see a real error.
continue
try:
with open(expanded, "r", encoding="utf-8", errors="replace") as fh:
data = fh.read()
except OSError:
# Binary branch: known binary extensions (PDF, archives, ...)
# OR a content sniff that finds a NUL byte. Same Bug A
# pattern as the image branch -- opening a binary file in
# text mode produces mojibake the model hallucinates from.
# We emit a small reminder pointing at the Read tool
# instead. PDF gets a more specific hint because Read
# supports it via the ``pages`` parameter; everything else
# gets a generic message.
if ext in _AT_MENTION_BINARY_EXTENSIONS or _looks_like_binary(expanded):
attachments.append(
{
"kind": "binary",
"path": expanded,
"display_path": display_path,
"ext": ext,
"hint": _binary_hint_for_ext(ext),
}
)
continue
data = _read_text_with_encoding(expanded)
if data is None:
# ``_read_text_with_encoding`` returns None on either
# OSError OR when the decoded text was mostly U+FFFD
# replacement chars (a binary blob with a spoofed
# BOM, or a file in an encoding we can't auto-detect).
# Emit a binary attachment instead of dropping
# silently so the user sees what happened and the
# model gets a Read-tool nudge.
attachments.append(
{
"kind": "binary",
"path": expanded,
"display_path": display_path,
"ext": ext,
"hint": _binary_hint_for_ext(ext),
}
)
continue
attachments.append(
{
Expand Down Expand Up @@ -412,6 +645,16 @@ def format_at_mention_attachments(attachments: list[dict[str, Any]]) -> str:
elif kind == "image":
# See docstring -- handled separately as a content block.
continue
elif kind == "binary":
# PDFs / archives / docx / ... — emit a short reminder pointing
# the model at the Read tool so the @-mention is acknowledged
# without flooding the prompt with mojibake.
blocks.append(
f"<system-reminder>\n"
f"@-mentioned file {att['display_path']} is a binary file "
f"and was not inlined. {att.get('hint', '')}\n"
f"</system-reminder>"
)
elif kind == "agent_mention":
# Mirrors ``typescript/src/utils/messages.ts`` ``agent_mention``
# case: the reminder nudges the model to delegate to the named
Expand Down
24 changes: 22 additions & 2 deletions src/providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,25 @@ def _get_model(self, **kwargs) -> str:
return strip_1m_context_suffix(raw) if raw else raw

def _prepare_messages(self, messages: list[MessageInput]) -> list[dict[str, Any]]:
"""Convert provider messages to API dictionary format."""
return [msg if isinstance(msg, dict) else msg.to_dict() for msg in messages]
"""Convert provider messages to API dictionary format.

Also runs ``validate_images_for_api`` so every provider — not just
the Anthropic-direct ``services/api/claude.py:call_model`` path —
rejects oversize base64 images before the network round trip.
Anthropic's 5 MB hard limit applies to its own provider; for
OpenAI-compatible providers the check runs on the still-Anthropic
shape (subclasses call ``super()._prepare_messages`` before
translating to ``image_url``), so the same client-side guard
applies. Providers without image support are unaffected: the
walker only inspects ``type=image`` blocks. Raises
``ImageSizeError``; callers (``query._call_model_sync``,
``tool_system.agent_loop._call_provider_for_turn``) translate it
into a media-size error message rather than letting it surface as
an opaque API failure.
"""
prepared = [msg if isinstance(msg, dict) else msg.to_dict() for msg in messages]
# Local import to avoid a top-level dependency from base.py into
# the utils package, matching the style of ``_get_model``.
from src.utils.image_validation import validate_images_for_api
validate_images_for_api(prepared)
return prepared
Loading