From 2781723fd5728907396adf302a605d30108aba92 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 11:45:46 +0000 Subject: [PATCH] feat(validation): oneOf near-miss hints on VALIDATION_ERROR issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an optional `hint` field to `ValidationIssue` and the wire `VALIDATION_ERROR` envelope. When a `oneOf`/discriminated-union check fails, a heuristic picks the closest variant (highest required-fields- present score) and emits a hint naming the variant and discriminator. Hint text is built exclusively from schema-declared strings — variant titles, discriminator field names, and `const` values — so it never echoes user-supplied keys or values into logs or LLM contexts. Refs #460 https://claude.ai/code/session_0147Kbe9nML1irSFSxMBxtWi --- src/adcp/validation/schema_errors.py | 21 +++-- src/adcp/validation/schema_validator.py | 94 +++++++++++++++++-- tests/test_schema_validation.py | 114 ++++++++++++++++++++++++ 3 files changed, 211 insertions(+), 18 deletions(-) diff --git a/src/adcp/validation/schema_errors.py b/src/adcp/validation/schema_errors.py index 2ad200b8c..b08425d86 100644 --- a/src/adcp/validation/schema_errors.py +++ b/src/adcp/validation/schema_errors.py @@ -66,21 +66,24 @@ def build_adcp_validation_error_payload( else: message = f"{tool} {side} failed schema validation" + def _issue_dict(i: ValidationIssue) -> dict[str, Any]: + d: dict[str, Any] = { + "pointer": i.pointer, + "message": i.message, + "keyword": i.keyword, + "schema_path": i.schema_path, + } + if i.hint is not None: + d["hint"] = i.hint + return d + payload: dict[str, Any] = { "code": "VALIDATION_ERROR", "message": message, "details": { "tool": tool, "side": side, - "issues": [ - { - "pointer": i.pointer, - "message": i.message, - "keyword": i.keyword, - "schema_path": i.schema_path, - } - for i in issues - ], + "issues": [_issue_dict(i) for i in issues], }, } if first is not None and first.pointer: diff --git a/src/adcp/validation/schema_validator.py b/src/adcp/validation/schema_validator.py index 5bec81970..62563a3cd 100644 --- a/src/adcp/validation/schema_validator.py +++ b/src/adcp/validation/schema_validator.py @@ -48,12 +48,17 @@ class ValidationIssue: keyword: jsonschema keyword that rejected the payload (``required``, ``type``, ``enum``, etc.). schema_path: Path inside the schema that rejected the payload. + hint: Optional near-miss suggestion for oneOf/discriminated-union + failures. Contains only schema-declared strings (variant titles, + discriminator field names, const values) — never user-supplied + keys or values. Safe to return over the wire. """ pointer: str message: str keyword: str schema_path: str + hint: str | None = None @dataclass(frozen=True) @@ -98,18 +103,21 @@ def __init__( self.side = side self.issues = issues self.code = "VALIDATION_ERROR" + def _issue_dict(i: ValidationIssue) -> dict[str, Any]: + d: dict[str, Any] = { + "pointer": i.pointer, + "message": i.message, + "keyword": i.keyword, + "schema_path": i.schema_path, + } + if i.hint is not None: + d["hint"] = i.hint + return d + self.details = { "tool": tool, "side": side, - "issues": [ - { - "pointer": i.pointer, - "message": i.message, - "keyword": i.keyword, - "schema_path": i.schema_path, - } - for i in issues - ], + "issues": [_issue_dict(i) for i in issues], } if message is None: first = issues[0] if issues else None @@ -208,6 +216,71 @@ def _missing_required_key(err: Any) -> str | None: return None +_MAX_ONEOF_BRANCHES = 20 + + +def _infer_oneof_hint(err: Any) -> str | None: + """Find the closest oneOf variant and return a schema-declared hint. + + Scoring uses field presence in the instance (internal only) to pick the + branch with the most required fields already satisfied. Hint text is + built exclusively from schema-declared strings — never user-supplied keys + or values — so it is safe to surface on the wire and in LLM contexts. + """ + branches = (err.schema or {}).get("oneOf") + if not branches: + return None + + instance = err.instance if isinstance(err.instance, dict) else {} + + best_branch: dict[str, Any] | None = None + best_score = -1 + + for branch in branches[:_MAX_ONEOF_BRANCHES]: + if not isinstance(branch, dict): + continue + required = branch.get("required", []) + if not isinstance(required, list): + continue + score = sum(1 for f in required if f in instance) + if score > best_score: + best_score = score + best_branch = branch + + if best_branch is None: + return None + + # Locate the discriminator: a *required* property whose schema has a 'const'. + # Restrict to required fields so a non-required property with a 'const' + # default annotation doesn't shadow the real discriminator. + # These are schema-declared strings only — safe for wire emission. + props = best_branch.get("properties", {}) + required_set = set(best_branch.get("required", [])) + discriminator_field: str | None = None + discriminator_value: str | None = None + for field_name, field_schema in props.items(): + if field_name not in required_set: + continue + if not isinstance(field_schema, dict): + continue + const_val = field_schema.get("const") + if const_val is not None and isinstance(const_val, str): + discriminator_field = field_name + discriminator_value = const_val + break + + title = best_branch.get("title") or discriminator_value or "a known variant" + if not isinstance(title, str): + title = str(title) + + if discriminator_field is not None and discriminator_value is not None: + return ( + f"Closest match: '{title}' variant. " + f"Field '{discriminator_field}' is required with value '{discriminator_value}'." + ) + return f"Closest match: '{title}' variant." + + def _format_error(err: Any) -> ValidationIssue: """Turn a ``jsonschema.exceptions.ValidationError`` into a ``ValidationIssue``.""" pointer = _path_to_pointer(list(err.absolute_path)) @@ -220,11 +293,14 @@ def _format_error(err: Any) -> ValidationIssue: schema_path = "#/" + "/".join(str(seg) for seg in err.absolute_schema_path) + hint = _infer_oneof_hint(err) if keyword == "oneOf" else None + return ValidationIssue( pointer=pointer, message=_safe_message(err, keyword), keyword=keyword, schema_path=schema_path, + hint=hint, ) diff --git a/tests/test_schema_validation.py b/tests/test_schema_validation.py index 43e9b31a4..c96e80aa0 100644 --- a/tests/test_schema_validation.py +++ b/tests/test_schema_validation.py @@ -241,3 +241,117 @@ def test_explicit_config_overrides_defaults(self) -> None: ValidationHookConfig(requests="strict", responses="off") ) assert (req, resp) == ("strict", "off") + + +class TestOneOfNearMissHints: + """pricing_options regression class: 'type' submitted instead of 'pricing_model'.""" + + def _pricing_options_payload_with_wrong_key(self) -> dict: + return { + "products": [ + { + "product_id": "prod_1", + "name": "Banner", + "buying_mode": "programmatic", + "pricing_options": [ + { + "pricing_option_id": "po_1", + "type": "cpm", # wrong discriminator key + "currency": "USD", + } + ], + } + ] + } + + def test_hint_present_on_oneof_failure(self) -> None: + outcome = validate_response("get_products", self._pricing_options_payload_with_wrong_key()) + oneof_issues = [i for i in outcome.issues if i.keyword == "oneOf"] + assert oneof_issues, "expected at least one oneOf issue" + first = oneof_issues[0] + assert first.hint is not None, "expected a hint on oneOf failure" + + def test_hint_names_closest_variant(self) -> None: + outcome = validate_response("get_products", self._pricing_options_payload_with_wrong_key()) + oneof_issues = [i for i in outcome.issues if i.keyword == "oneOf"] + hint = oneof_issues[0].hint + assert hint is not None + # CPM variant is the closest match (pricing_option_id + currency present) + assert "cpm" in hint.lower() or "CPM" in hint + + def test_hint_names_discriminator_field(self) -> None: + outcome = validate_response("get_products", self._pricing_options_payload_with_wrong_key()) + oneof_issues = [i for i in outcome.issues if i.keyword == "oneOf"] + hint = oneof_issues[0].hint + assert hint is not None + assert "pricing_model" in hint + + def test_hint_does_not_echo_user_supplied_value(self) -> None: + """The hint must never echo user-controlled content (values or key names).""" + secret = "Bearer sk-should-never-appear" + outcome = validate_response( + "get_products", + { + "products": [ + { + "product_id": "prod_1", + "name": "Banner", + "buying_mode": "programmatic", + "pricing_options": [ + { + "pricing_option_id": "po_1", + secret: "cpm", # hostile key name + "currency": "USD", + } + ], + } + ] + }, + ) + for issue in outcome.issues: + if issue.hint: + assert secret not in issue.hint + + def test_no_hint_on_non_oneof_failure(self) -> None: + outcome = validate_response("get_products", {"products": "not-an-array"}) + for issue in outcome.issues: + assert issue.hint is None, f"unexpected hint on {issue.keyword!r} issue" + + def test_hint_serialized_in_schema_validation_error_details(self) -> None: + from adcp.validation.schema_validator import validate_response as vr + from adcp.validation.schema_errors import build_adcp_validation_error_payload + + outcome = vr("get_products", self._pricing_options_payload_with_wrong_key()) + oneof_issues = [i for i in outcome.issues if i.keyword == "oneOf"] + assert oneof_issues + payload = build_adcp_validation_error_payload("get_products", "response", oneof_issues) + wire_issue = payload["details"]["issues"][0] + assert "hint" in wire_issue, "hint must appear in wire payload" + assert wire_issue["hint"] is not None + + def test_hint_serialized_in_schema_validation_error_exception(self) -> None: + from adcp.validation.schema_errors import build_validation_error + from adcp.validation.schema_validator import validate_response as vr + + outcome = vr("get_products", self._pricing_options_payload_with_wrong_key()) + oneof_issues = [i for i in outcome.issues if i.keyword == "oneOf"] + assert oneof_issues + exc = build_validation_error("get_products", "response", oneof_issues) + wire_issue = exc.details["issues"][0] + assert "hint" in wire_issue + + def test_hint_absent_from_wire_when_none(self) -> None: + """Issues without hints must not emit a 'hint': null key on the wire.""" + from adcp.validation.schema_errors import build_adcp_validation_error_payload + + issues = [ + ValidationIssue( + pointer="/foo", + message="required", + keyword="required", + schema_path="#/required", + ) + ] + payload = build_adcp_validation_error_payload("get_products", "request", issues) + wire_issue = payload["details"]["issues"][0] + assert "hint" not in wire_issue