Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions src/adcp/validation/schema_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,24 @@ def build_adcp_validation_error_payload(
else:
message = f"{tool} {side} failed schema validation"

def _issue_dict(i: ValidationIssue) -> dict[str, Any]:
d: dict[str, Any] = {
"pointer": i.pointer,
"message": i.message,
"keyword": i.keyword,
"schema_path": i.schema_path,
}
if i.hint is not None:
d["hint"] = i.hint
return d

payload: dict[str, Any] = {
"code": "VALIDATION_ERROR",
"message": message,
"details": {
"tool": tool,
"side": side,
"issues": [
{
"pointer": i.pointer,
"message": i.message,
"keyword": i.keyword,
"schema_path": i.schema_path,
}
for i in issues
],
"issues": [_issue_dict(i) for i in issues],
},
}
if first is not None and first.pointer:
Expand Down
94 changes: 85 additions & 9 deletions src/adcp/validation/schema_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,17 @@ class ValidationIssue:
keyword: jsonschema keyword that rejected the payload
(``required``, ``type``, ``enum``, etc.).
schema_path: Path inside the schema that rejected the payload.
hint: Optional near-miss suggestion for oneOf/discriminated-union
failures. Contains only schema-declared strings (variant titles,
discriminator field names, const values) — never user-supplied
keys or values. Safe to return over the wire.
"""

pointer: str
message: str
keyword: str
schema_path: str
hint: str | None = None


@dataclass(frozen=True)
Expand Down Expand Up @@ -98,18 +103,21 @@ def __init__(
self.side = side
self.issues = issues
self.code = "VALIDATION_ERROR"
def _issue_dict(i: ValidationIssue) -> dict[str, Any]:
d: dict[str, Any] = {
"pointer": i.pointer,
"message": i.message,
"keyword": i.keyword,
"schema_path": i.schema_path,
}
if i.hint is not None:
d["hint"] = i.hint
return d

self.details = {
"tool": tool,
"side": side,
"issues": [
{
"pointer": i.pointer,
"message": i.message,
"keyword": i.keyword,
"schema_path": i.schema_path,
}
for i in issues
],
"issues": [_issue_dict(i) for i in issues],
}
if message is None:
first = issues[0] if issues else None
Expand Down Expand Up @@ -208,6 +216,71 @@ def _missing_required_key(err: Any) -> str | None:
return None


_MAX_ONEOF_BRANCHES = 20


def _infer_oneof_hint(err: Any) -> str | None:
"""Find the closest oneOf variant and return a schema-declared hint.

Scoring uses field presence in the instance (internal only) to pick the
branch with the most required fields already satisfied. Hint text is
built exclusively from schema-declared strings — never user-supplied keys
or values — so it is safe to surface on the wire and in LLM contexts.
"""
branches = (err.schema or {}).get("oneOf")
if not branches:
return None

instance = err.instance if isinstance(err.instance, dict) else {}

best_branch: dict[str, Any] | None = None
best_score = -1

for branch in branches[:_MAX_ONEOF_BRANCHES]:
if not isinstance(branch, dict):
continue
required = branch.get("required", [])
if not isinstance(required, list):
continue
score = sum(1 for f in required if f in instance)
if score > best_score:
best_score = score
best_branch = branch

if best_branch is None:
return None

# Locate the discriminator: a *required* property whose schema has a 'const'.
# Restrict to required fields so a non-required property with a 'const'
# default annotation doesn't shadow the real discriminator.
# These are schema-declared strings only — safe for wire emission.
props = best_branch.get("properties", {})
required_set = set(best_branch.get("required", []))
discriminator_field: str | None = None
discriminator_value: str | None = None
for field_name, field_schema in props.items():
if field_name not in required_set:
continue
if not isinstance(field_schema, dict):
continue
const_val = field_schema.get("const")
if const_val is not None and isinstance(const_val, str):
discriminator_field = field_name
discriminator_value = const_val
break

title = best_branch.get("title") or discriminator_value or "a known variant"
if not isinstance(title, str):
title = str(title)

if discriminator_field is not None and discriminator_value is not None:
return (
f"Closest match: '{title}' variant. "
f"Field '{discriminator_field}' is required with value '{discriminator_value}'."
)
return f"Closest match: '{title}' variant."


def _format_error(err: Any) -> ValidationIssue:
"""Turn a ``jsonschema.exceptions.ValidationError`` into a ``ValidationIssue``."""
pointer = _path_to_pointer(list(err.absolute_path))
Expand All @@ -220,11 +293,14 @@ def _format_error(err: Any) -> ValidationIssue:

schema_path = "#/" + "/".join(str(seg) for seg in err.absolute_schema_path)

hint = _infer_oneof_hint(err) if keyword == "oneOf" else None

return ValidationIssue(
pointer=pointer,
message=_safe_message(err, keyword),
keyword=keyword,
schema_path=schema_path,
hint=hint,
)


Expand Down
114 changes: 114 additions & 0 deletions tests/test_schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,117 @@ def test_explicit_config_overrides_defaults(self) -> None:
ValidationHookConfig(requests="strict", responses="off")
)
assert (req, resp) == ("strict", "off")


class TestOneOfNearMissHints:
"""pricing_options regression class: 'type' submitted instead of 'pricing_model'."""

def _pricing_options_payload_with_wrong_key(self) -> dict:
return {
"products": [
{
"product_id": "prod_1",
"name": "Banner",
"buying_mode": "programmatic",
"pricing_options": [
{
"pricing_option_id": "po_1",
"type": "cpm", # wrong discriminator key
"currency": "USD",
}
],
}
]
}

def test_hint_present_on_oneof_failure(self) -> None:
outcome = validate_response("get_products", self._pricing_options_payload_with_wrong_key())
oneof_issues = [i for i in outcome.issues if i.keyword == "oneOf"]
assert oneof_issues, "expected at least one oneOf issue"
first = oneof_issues[0]
assert first.hint is not None, "expected a hint on oneOf failure"

def test_hint_names_closest_variant(self) -> None:
outcome = validate_response("get_products", self._pricing_options_payload_with_wrong_key())
oneof_issues = [i for i in outcome.issues if i.keyword == "oneOf"]
hint = oneof_issues[0].hint
assert hint is not None
# CPM variant is the closest match (pricing_option_id + currency present)
assert "cpm" in hint.lower() or "CPM" in hint

def test_hint_names_discriminator_field(self) -> None:
outcome = validate_response("get_products", self._pricing_options_payload_with_wrong_key())
oneof_issues = [i for i in outcome.issues if i.keyword == "oneOf"]
hint = oneof_issues[0].hint
assert hint is not None
assert "pricing_model" in hint

def test_hint_does_not_echo_user_supplied_value(self) -> None:
"""The hint must never echo user-controlled content (values or key names)."""
secret = "Bearer sk-should-never-appear"
outcome = validate_response(
"get_products",
{
"products": [
{
"product_id": "prod_1",
"name": "Banner",
"buying_mode": "programmatic",
"pricing_options": [
{
"pricing_option_id": "po_1",
secret: "cpm", # hostile key name
"currency": "USD",
}
],
}
]
},
)
for issue in outcome.issues:
if issue.hint:
assert secret not in issue.hint

def test_no_hint_on_non_oneof_failure(self) -> None:
outcome = validate_response("get_products", {"products": "not-an-array"})
for issue in outcome.issues:
assert issue.hint is None, f"unexpected hint on {issue.keyword!r} issue"

def test_hint_serialized_in_schema_validation_error_details(self) -> None:
from adcp.validation.schema_validator import validate_response as vr
from adcp.validation.schema_errors import build_adcp_validation_error_payload

outcome = vr("get_products", self._pricing_options_payload_with_wrong_key())
oneof_issues = [i for i in outcome.issues if i.keyword == "oneOf"]
assert oneof_issues
payload = build_adcp_validation_error_payload("get_products", "response", oneof_issues)
wire_issue = payload["details"]["issues"][0]
assert "hint" in wire_issue, "hint must appear in wire payload"
assert wire_issue["hint"] is not None

def test_hint_serialized_in_schema_validation_error_exception(self) -> None:
from adcp.validation.schema_errors import build_validation_error
from adcp.validation.schema_validator import validate_response as vr

outcome = vr("get_products", self._pricing_options_payload_with_wrong_key())
oneof_issues = [i for i in outcome.issues if i.keyword == "oneOf"]
assert oneof_issues
exc = build_validation_error("get_products", "response", oneof_issues)
wire_issue = exc.details["issues"][0]
assert "hint" in wire_issue

def test_hint_absent_from_wire_when_none(self) -> None:
"""Issues without hints must not emit a 'hint': null key on the wire."""
from adcp.validation.schema_errors import build_adcp_validation_error_payload

issues = [
ValidationIssue(
pointer="/foo",
message="required",
keyword="required",
schema_path="#/required",
)
]
payload = build_adcp_validation_error_payload("get_products", "request", issues)
wire_issue = payload["details"]["issues"][0]
assert "hint" not in wire_issue
Loading