diff --git a/.gitignore b/.gitignore index 360134b51c..ce4ccbca23 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,9 @@ # query compilation caches .cache +# MISRA help generator docling cache +scripts/generate_rules/misra_help/cache/ + # qltest projects and artifacts **/test/**/*.testproj **/test/**/*.actual diff --git a/scripts/generate_rules/misra_help/README.md b/scripts/generate_rules/misra_help/README.md new file mode 100644 index 0000000000..7ac0c4678c --- /dev/null +++ b/scripts/generate_rules/misra_help/README.md @@ -0,0 +1,64 @@ +# MISRA help-file populator + +Generates per-query Markdown help files in +`codeql-coding-standards-help/{c,cpp}/misra/src/rules//.md` +from the licensed MISRA PDFs. + +## Prerequisites + +1. **Python venv with docling** (~3 GB, not in `scripts/requirements.txt`): + + ```bash + python3 -m venv .venv && .venv/bin/pip install docling + ``` + +2. **MISRA PDFs** — licensed material, excluded from version control. + Place them in your `codeql-coding-standards-help` checkout: + + ```bash + cp ~/Downloads/MISRA-C-2023-*.pdf ../codeql-coding-standards-help/ + cp ~/Downloads/MISRA-CPP-2023-*.pdf ../codeql-coding-standards-help/ + ``` + + The tool resolves PDFs via: `--pdf` flag > `$MISRA_C_PDF` / + `$MISRA_CPP_PDF` env vars > glob in `--help-repo`. + +## Usage + +```bash +# Deterministic render (Stage 1 only): +.venv/bin/python populate_help.py --standard MISRA-C++-2023 +.venv/bin/python populate_help.py --standard MISRA-C-2012 + +# Single rule: +.venv/bin/python populate_help.py --standard MISRA-C++-2023 --rule RULE-8-1 + +# Fill in missing help only (don't overwrite existing): +.venv/bin/python populate_help.py --standard MISRA-C++-2023 --no-overwrite + +# Preview without writing: +.venv/bin/python populate_help.py --standard MISRA-C++-2023 --dry-run +``` + +### Two-pass mode (deterministic + LLM lint) + +```bash +# 1. Build the JSON sidecar: +.venv/bin/python dump_rules_json.py --standard MISRA-C-2012 + +# 2. Re-render + LLM proofread: +.venv/bin/python refresh_help.py --standard MISRA-C-2012 +``` + +## Files + +| File | Purpose | +| --------------------- | ------------------------------------------------------- | +| `extract_rules.py` | docling PDF → `Rule` dataclasses (deterministic core) | +| `populate_help.py` | Walk `.ql` queries, render and write `.md` help files | +| `dump_rules_json.py` | Emit JSON sidecar for the LLM rewrite pass | +| `rewrite_help.py` | Headless Copilot driver for LLM lint/proofread | +| `refresh_help.py` | Combined Stage 1 + cache patch + Stage 2 runner | +| `harness.py` | Determinism harness (per-section hashing across N runs) | +| `cache.py` | Shared helpers for cache path resolution and I/O | + diff --git a/scripts/generate_rules/misra_help/__init__.py b/scripts/generate_rules/misra_help/__init__.py new file mode 100644 index 0000000000..30be70b0d9 --- /dev/null +++ b/scripts/generate_rules/misra_help/__init__.py @@ -0,0 +1,4 @@ +"""MISRA help-file populator. + +See `populate_help.py` for the entry point. +""" diff --git a/scripts/generate_rules/misra_help/cache.py b/scripts/generate_rules/misra_help/cache.py new file mode 100644 index 0000000000..e4af10209e --- /dev/null +++ b/scripts/generate_rules/misra_help/cache.py @@ -0,0 +1,34 @@ +"""Shared helpers for locating and reading the MISRA rule cache.""" +from __future__ import annotations +import json +from pathlib import Path +from typing import Any + +SCRIPT_DIR = Path(__file__).resolve().parent +DEFAULT_CACHE_DIR = SCRIPT_DIR / "cache" +DEFAULT_HELP_REPO = SCRIPT_DIR.parents[2].parent / "codeql-coding-standards-help" + + +def cache_path_for(help_repo: Path, standard: str) -> Path: + """Return the path to the JSON cache file for a standard.""" + return help_repo / ".misra-rule-cache" / f"{standard}.json" + + +def load_cache(help_repo: Path, standard: str) -> dict[str, Any]: + """Load and return the JSON cache for a standard.""" + path = cache_path_for(help_repo, standard) + if not path.exists(): + raise FileNotFoundError( + f"Cache not found: {path}. Run dump_rules_json.py first." + ) + return json.loads(path.read_text(encoding="utf-8")) + + +def save_cache(help_repo: Path, standard: str, data: dict[str, Any]) -> Path: + """Write the JSON cache for a standard and return the path.""" + path = cache_path_for(help_repo, standard) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8" + ) + return path diff --git a/scripts/generate_rules/misra_help/dump_rules_json.py b/scripts/generate_rules/misra_help/dump_rules_json.py new file mode 100644 index 0000000000..5a44cb787c --- /dev/null +++ b/scripts/generate_rules/misra_help/dump_rules_json.py @@ -0,0 +1,202 @@ +"""Emit a per-standard JSON sidecar containing every extracted MISRA +rule plus, for each `.ql` query that targets the rule, the query's +`@name` title, target `.md` path, and the existing `.md` content (if +any). This file is the input to the agent extension's LLM-driven +"rewrite help docs" pass: docling extracts the structured rule data +deterministically, then the LLM uses both the structured data AND the +.ql title to produce a polished, idiomatic help file. + +Output layout: + + /.misra-rule-cache/.json + +Schema (top-level): + + { + "standard": "MISRA-C-2012", + "lang": "c", + "lang_src": "c/misra/src/rules", + "generated_at": "2026-04-20T10:11:12Z", + "rules": { + "RULE-9-2": { + "rule_id": "RULE-9-2", + "raw_id": "Rule 9.2", + "standard": "MISRA-C-2012", + "title": "...", + "category": "Required", + "analysis": "Decidable, Single Translation Unit", + "applies_to": "C90, C99, C11", + "amplification": "...", + "rationale": "...", + "exceptions": ["...", "..."], + "example_layout": [ + {"kind": "code", "text": "..."}, + {"kind": "text", "text": "..."} + ], + "see_also": [...] + }, + ... + }, + "queries": { + "RULE-9-2": [ + { + "ql_path": "c/misra/src/rules/RULE-9-2/Init...braces.ql", + "ql_name_title": "The initializer for an aggregate ...", + "md_path": "c/misra/src/rules/RULE-9-2/Init...braces.md", + "existing_md": "..." // null if the .md does not exist + }, + ... + ], + ... + } + } + +The `existing_md` content is included so the LLM pass can preserve +human-authored details (alert message wording, special examples) that +docling did not capture. +""" +from __future__ import annotations +import argparse +import datetime as _dt +import json +import sys +from dataclasses import asdict +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from extract_rules import extract_rules, Rule # noqa: E402 +from cache import cache_path_for, save_cache # noqa: E402 +from populate_help import ( # noqa: E402 + STANDARD_INFO, + SUPPORTED_STANDARDS, + DEFAULT_HELP_REPO, + DEFAULT_QUERY_REPO, + collect_queries, + resolve_pdf, + _read_ql_name, +) + + +def _load_impl_scope_lookup( + query_repo: Path, standard: str, +) -> dict[tuple[str, str], dict]: + """Build a (rule_id, short_name) -> implementation_scope lookup + from the rule_packages JSON files.""" + lang, _ = STANDARD_INFO[standard] + pkg_dir = query_repo / "rule_packages" / lang + if not pkg_dir.is_dir(): + return {} + lookup: dict[tuple[str, str], dict] = {} + for pkg_file in sorted(pkg_dir.glob("*.json")): + try: + data = json.loads(pkg_file.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + continue + # Top-level key is the standard name (e.g. "MISRA-C-2012"). + for std_key, rules in data.items(): + if not isinstance(rules, dict): + continue + for rule_id, rule_data in rules.items(): + if not isinstance(rule_data, dict): + continue + for q in rule_data.get("queries", []): + sn = q.get("short_name") + impl = q.get("implementation_scope") + if sn and impl: + lookup[(rule_id, sn)] = impl + return lookup + + +def _rule_to_jsonable(rule: Rule) -> dict: + """Serialize a Rule to JSON, including the example layout.""" + d = asdict(rule) + layout = getattr(rule, "_example_layout", None) + if layout: + d["example_layout"] = [{"kind": k, "text": s} for (k, s) in layout] + else: + d["example_layout"] = [] + return d + + +def _query_entries(rule_id: str, ql_paths: list[Path], + query_repo: Path, help_repo: Path, + lang_src: Path, + impl_lookup: dict[tuple[str, str], dict] | None = None, + ) -> list[dict]: + out: list[dict] = [] + for ql in sorted(ql_paths): + rel_dir = ql.parent.relative_to(query_repo / lang_src) + md = help_repo / lang_src / rel_dir / (ql.stem + ".md") + try: + existing = md.read_text(encoding="utf-8") + except FileNotFoundError: + existing = None + entry: dict = { + "ql_path": str(ql.relative_to(query_repo)), + "ql_name_title": _read_ql_name(ql) or "", + "md_path": str(md.relative_to(help_repo)), + "existing_md": existing, + } + if impl_lookup: + impl = impl_lookup.get((rule_id, ql.stem)) + if impl: + entry["implementation_scope"] = impl + out.append(entry) + return out + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--standard", required=True, choices=SUPPORTED_STANDARDS) + ap.add_argument("--query-repo", type=Path, default=DEFAULT_QUERY_REPO) + ap.add_argument("--help-repo", type=Path, default=DEFAULT_HELP_REPO) + ap.add_argument("--pdf", type=Path, default=None) + ap.add_argument("--cache-dir", type=Path, + default=Path(__file__).resolve().parent / "cache", + help="docling JSON cache dir") + ap.add_argument("--output", type=Path, default=None, + help="output path (default: " + "/.misra-rule-cache/.json)") + args = ap.parse_args() + + pdf = resolve_pdf(args.standard, args.pdf, args.help_repo) + args.cache_dir.mkdir(parents=True, exist_ok=True) + rules = extract_rules(pdf, args.standard, args.cache_dir) + + lang, lang_src = STANDARD_INFO[args.standard] + queries = collect_queries(args.query_repo, args.standard) + + impl_lookup = _load_impl_scope_lookup(args.query_repo, args.standard) + + rules_json: dict[str, dict] = {} + for r in rules: + rules_json[r.rule_id] = _rule_to_jsonable(r) + + queries_json: dict[str, list[dict]] = {} + for rule_id, ql_paths in queries.items(): + queries_json[rule_id] = _query_entries( + rule_id, ql_paths, args.query_repo, args.help_repo, lang_src, + impl_lookup) + + payload = { + "standard": args.standard, + "lang": lang, + "lang_src": str(lang_src), + "generated_at": _dt.datetime.now(_dt.timezone.utc) + .strftime("%Y-%m-%dT%H:%M:%SZ"), + "rules": rules_json, + "queries": queries_json, + } + + out_path = args.output or cache_path_for(args.help_repo, args.standard) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), + encoding="utf-8") + print(f"wrote {out_path} ({len(rules_json)} rules, " + f"{sum(len(v) for v in queries_json.values())} queries)") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/generate_rules/misra_help/extract_rules.py b/scripts/generate_rules/misra_help/extract_rules.py new file mode 100644 index 0000000000..115a57fb0b --- /dev/null +++ b/scripts/generate_rules/misra_help/extract_rules.py @@ -0,0 +1,688 @@ +"""MISRA PDF → structured rule data extractor (docling-based). + +Pipeline: + 1. Convert each PDF with docling, getting structured JSON whose `texts[]` + items carry labels (section_header / text / list_item / code / table). + 2. Walk the texts in document order, slicing into per-rule chunks at any + item whose text starts with "Rule N.N[.N]" or "Dir N.N[.N]" and which + has a `Category` line within the next ~25 items. + 3. Repair the C++ PDF's broken font CMap (`fi`/`fl`/`ff` glyphs encoded as + `9`/`2`/`C`). Repair is deterministic and wordlist-based: at each + suspect glyph between two letters, try fi/fl/ff/ffi/ffl substitutions + and accept the unique substitution that yields a real word; if zero or + multiple substitutions produce real words, leave the glyph untouched. + 4. Render each rule via a help-file template that mirrors the on-disk + convention used in `codeql-coding-standards-help/c/misra/src/rules/`. +""" +from __future__ import annotations +import json +import re +from dataclasses import dataclass, field, asdict +from pathlib import Path + +# ---------------------------------------------------------------------------- +# Wordlist-based ligature repair (deterministic) +# ---------------------------------------------------------------------------- +_WORDLIST_PATHS = ["/usr/share/dict/words", "/usr/dict/words"] +_EXTRA_WORDS = { + "dataflow", "workflow", "reflow", "overflow", "overflows", "overflowed", + "overflowing", "underflow", "underflows", "outflow", "flow", "flows", + "flowing", "flag", "flags", "flagged", "flagging", "float", "floats", + "floating", "conflict", "conflicts", "conflicting", "conflicted", + "reflect", "reflects", "reflected", "reflecting", "superfluous", + "inflow", "offsetof", "sufficient", "efficient", "difficult", "difficulty", + "config", "configure", "configured", "configuration", "configurations", + "buffer", "buffers", "buffered", "buffering", + "differ", "different", "differently", "difference", "differences", + "differing", "differs", + "effect", "effects", "effective", "effectively", "effort", "efforts", + "affect", "affects", "affected", "affecting", + "specifier", "specifiers", "specification", "specifications", + "definition", "definitions", "define", "defined", "defines", "defining", + "amplification", "classification", "identifier", "identifiers", + "identified", "identifies", "identify", "identifying", + "modifier", "modifiers", "modifies", "modify", "modified", "modification", + "qualifier", "qualifiers", "qualified", "qualify", + "predefined", "undefined", "unspecified", "specified", "specify", + "prefix", "prefixed", "prefixes", + "fixed", "fix", "fixes", "field", "fields", "file", "files", + "first", "firstly", + "benefit", "benefits", "benefited", + "clarified", "confined", "filename", "filenames", "filesystem", + "lifetime", "compile", "compiled", "compiles", "compiler", "compilers", + "compilation", "redefine", "redefined", + "bitfield", "bitfields", "welldefined", "illdefined", +} + +_WORDS_CACHE: set[str] | None = None + + +def _load_words() -> set[str]: + global _WORDS_CACHE + if _WORDS_CACHE is not None: + return _WORDS_CACHE + words: set[str] = set(_EXTRA_WORDS) + found_system = False + for p in _WORDLIST_PATHS: + path = Path(p) + if path.exists(): + with path.open() as f: + words |= {w.strip().lower() for w in f if w.strip()} + found_system = True + break + if not found_system: + import warnings + warnings.warn( + "No system wordlist found; ligature repair will rely on " + "the built-in word list only. Install a words file at " + f"{_WORDLIST_PATHS[0]} for full coverage.", + stacklevel=2, + ) + _WORDS_CACHE = words + return words + + +_LIGS = ("fi", "fl", "ff", "ffi", "ffl") +# Suspect glyphs observed in the MISRA C++ PDF's font CMap: +# digits 0-9, capital `C`, caret `^`, percent `%`, and capital `A` +# all appear where a genuine ligature (fi/fl/ff/ffi/ffl) was +# originally rendered. The wordlist check in `repair_ligatures` +# prevents mis-substitution on legitimate CamelCase identifiers +# containing `A` or `C`. +_SUSPECT_GLYPHS = set("0123456789CA^%") +_SUSPECT_TOKEN_RE = re.compile(r"[A-Za-z0-9\^%]+") + + +def repair_ligatures(text: str) -> str: + """Fix MISRA C++ PDF's font-CMap-induced ligature corruption. + + For each token containing a suspect glyph, try each ligature + substitution at each suspect position; if exactly one substitution + yields a dictionary word, apply it. Otherwise leave the token alone + (preserves real numeric literals and identifiers like `int32_t` and + code variables like `Class`). + """ + words = _load_words() + + def fix(tok: str) -> str: + # Only attempt repairs on tokens that already contain letters; + # pure-digit tokens like "4" or "10" must be left alone even + # though they start or end with a suspect glyph. + if not any(c.isalpha() for c in tok): + return tok + # Skip tokens that contain no suspect glyphs at all. + if not any(c in _SUSPECT_GLYPHS for c in tok): + return tok + low = tok.lower() + if low.isalpha() and low in words: + return tok + out = tok + for i, ch in enumerate(out): + if ch not in _SUSPECT_GLYPHS: + continue + left_ok = (i == 0) or out[i - 1].isalpha() + right_ok = (i == len(out) - 1) or out[i + 1].isalpha() + if not (left_ok and right_ok): + continue + hits = [] + for lig in _LIGS: + cand = (out[:i] + lig + out[i + 1 :]).lower() + if cand in words: + hits.append(lig) + if len(hits) == 1: + out = out[:i] + hits[0] + out[i + 1 :] + break # indices shifted; one repair per token suffices + return out + + return _SUSPECT_TOKEN_RE.sub(lambda m: fix(m.group(0)), text) + + +# ---------------------------------------------------------------------------- +# Docling load (cached) +# ---------------------------------------------------------------------------- + +def load_docling_json(pdf_path: Path, cache_dir: Path) -> dict: + cache_dir.mkdir(parents=True, exist_ok=True) + out = cache_dir / f"{pdf_path.stem}.docling.json" + if not out.exists(): + # Lazy import — docling is heavy and only needed on cache miss. + from docling.document_converter import DocumentConverter + conv = DocumentConverter() + result = conv.convert(str(pdf_path)) + out.write_text( + json.dumps(result.document.export_to_dict(), indent=2), + encoding="utf-8", + ) + return json.loads(out.read_text(encoding="utf-8")) + + +# ---------------------------------------------------------------------------- +# Rule extraction over docling's text stream +# ---------------------------------------------------------------------------- + +RULE_ANCHOR_RE = re.compile( + r"^(?PRule|Dir)\s+(?P\d+(?:\.\d+){1,2})\b\s*(?P.*)$" +) +HEADER_KEYS = ("Category", "Analysis", "Applies to") +SUB_LABELS = ("Amplification", "Rationale", "Exception", "Example", "See also") + +# `page_header` items (running heads like "Section 4: Guidelines" or +# "Rule 15.0.2") must be retained for rule-anchor detection (a small number of +# real rule headers in the C PDF land in `page_header`-labelled items), but +# they MUST NOT be allowed to leak into the body of a rule's sections. We +# therefore keep them in `_items()` but filter them when accumulating section +# content in `_build_rule()`. +_BODY_SKIP_LABELS = {"page_header"} + + +@dataclass +class TextItem: + label: str + text: str + page: int + + +@dataclass +class Rule: + rule_id: str + raw_id: str + title: str + standard: str + category: str = "" + analysis: str = "" + applies_to: str = "" + amplification: str = "" + rationale: str = "" + exceptions: list[str] = field(default_factory=list) + example: str = "" + see_also: list[str] = field(default_factory=list) + + +def _items(doc: dict) -> list[TextItem]: + items: list[TextItem] = [] + for t in doc["texts"]: + if t["label"] == "page_footer": + continue + page = t.get("prov", [{}])[0].get("page_no", 0) if t.get("prov") else 0 + # Normalize NBSP (U+00A0) — MISRA rule headers use it between + # "Rule" and the number, which would otherwise break our anchor. + raw = t.get("text", "").replace("\xa0", " ") + text = repair_ligatures(raw) + items.append(TextItem(label=t["label"], text=text, page=page)) + return items + + +def _anchor(it: TextItem) -> tuple[str, str, str] | None: + m = RULE_ANCHOR_RE.match(it.text.strip()) + if not m: + return None + return m.group("kind"), m.group("num"), m.group("rest").strip() + + +def _find_rule_starts(items: list[TextItem]) -> list[int]: + starts: list[int] = [] + seen: set[str] = set() + for i, it in enumerate(items): + a = _anchor(it) + if not a: + continue + kind, num, rest = a + # page_header items are running heads — ignore them when they're + # bare ids without title text (those reference a rule defined + # elsewhere); but accept them when they include the title (real + # rule headers in this PDF appear as page_header for some rules). + rid = f"{kind.upper()}-{num.replace('.', '-')}" + if rid in seen: + continue + # Require a `Category` line within the next 25 items to confirm + # this is a real rule definition (not a cross-reference). + for j in range(i + 1, min(i + 30, len(items))): + if items[j].text.strip().startswith("Category"): + starts.append(i) + seen.add(rid) + break + return starts + + +def _split_label_and_value(text: str, label: str) -> tuple[bool, str]: + s = text.strip() + if s == label: + return True, "" + if s.startswith(label + " "): + return True, s[len(label) + 1 :].strip() + if s.startswith(label + "\n"): + return True, s[len(label) + 1 :].strip() + return False, "" + + +def _classify_section(text: str) -> str | None: + s = text.strip() + for lab in SUB_LABELS: + if s == lab or s.startswith(lab + " ") or s.startswith(lab + "\n"): + return lab + # "Exception 1", "Exception 2" -> Exception + if lab == "Exception" and re.match(r"^Exception(\s+\d+)?\b", s): + return "Exception" + return None + + +def _build_rule(items: list[TextItem], start: int, end: int, standard: str) -> Rule: + head = items[start] + kind, num, rest = _anchor(head) # type: ignore + rule_id = f"{kind.upper()}-{num.replace('.', '-')}" + raw_id = f"{kind} {num}" + + # Title may continue across the next 1-2 plain text items before Category. + title_parts: list[str] = [] + if rest: + title_parts.append(rest) + body_start = start + 1 + while body_start < end: + it = items[body_start] + s = it.text.strip() + if not s: + body_start += 1 + continue + if s.startswith("Category") or _classify_section(s): + break + if it.label in ("text", "section_header"): + title_parts.append(s) + body_start += 1 + else: + break + title = " ".join(p for p in title_parts if p).strip() + + rule = Rule(rule_id=rule_id, raw_id=raw_id, title=title, standard=standard) + + cur: str | None = None + # `mixed_buf` preserves prose-and-code interleaving (so the Example + # section can present prose paragraphs between code blocks just as the + # PDF does). Each entry is ("text", str) or ("code", str). + mixed_buf: list[tuple[str, str]] = [] + + def flush(): + nonlocal mixed_buf + items_buf = mixed_buf + mixed_buf = [] + prose_only = "\n\n".join(s for kind, s in items_buf if kind == "text").strip() + if cur == "Amplification": + rule.amplification = prose_only + elif cur == "Rationale": + rule.rationale = prose_only + elif cur == "Exception": + if prose_only: + rule.exceptions.append(prose_only) + elif cur == "Example": + parts: list[str] = [] + run_text: list[str] = [] + run_code: list[str] = [] + + def flush_text(): + if run_text: + parts.append("\n\n".join(run_text)) + run_text.clear() + + def flush_code(): + if run_code: + parts.append("\n\n".join(run_code)) + run_code.clear() + + for kind, s in items_buf: + if kind == "code": + flush_text() + run_code.append(s) + else: + flush_code() + run_text.append(s) + flush_text() + flush_code() + rule.example = "\n\n".join(parts).strip() + rule._example_layout = items_buf # type: ignore[attr-defined] + elif cur == "See also": + rule.see_also = [s.strip() for s in re.split(r"[,\n]", prose_only) if s.strip()] + + skip_next = 0 + for k in range(body_start, end): + if skip_next: + skip_next -= 1 + continue + it = items[k] + s = it.text.strip() + if not s: + continue + # Header k/v: may be on one item ("Category Required") or split + # across two items ("Category" then "Required"). + matched_header = False + for hkey in HEADER_KEYS: + ok, val = _split_label_and_value(s, hkey) + if ok: + if not val and k + 1 < end: + # Look ahead: next item is the value. + nxt = items[k + 1].text.strip() + if nxt and not _classify_section(nxt) and not any( + nxt.startswith(h) for h in HEADER_KEYS + ): + val = nxt + skip_next = 1 + if hkey == "Category": + if not rule.category: + rule.category = val + elif hkey == "Analysis": + if not rule.analysis: + rule.analysis = val + elif hkey == "Applies to": + if not rule.applies_to: + rule.applies_to = val + matched_header = True + break + if matched_header: + continue + # Drop running-head text from the body of any section. + if it.label in _BODY_SKIP_LABELS: + continue + sec = _classify_section(s) + if sec: + flush() + cur = sec + ok, after = _split_label_and_value(s, sec if sec != "Exception" else s.split()[0]) + if after: + kind = "code" if it.label == "code" else "text" + mixed_buf.append((kind, after)) + continue + if cur is None: + continue + if it.label == "code": + mixed_buf.append(("code", s)) + elif it.label == "list_item": + mixed_buf.append(("text", f"- {s}")) + else: + mixed_buf.append(("text", s)) + flush() + return rule + + +# ---------------------------------------------------------------------------- +# Hand-curated repairs for rules whose docling output is too entangled with +# adjacent code/text items for the generic anchor logic to find. These PDFs +# are static (MISRA C 2023, MISRA C++ 2023), so we splice synthetic anchor +# items at content-anchored positions; we then let the normal `_build_rule` +# pipeline harvest section content from the items that follow. +# +# Each entry: (locator -> int|None, synthetic_items: list[TextItem]). +# The locator returns the index in `items` BEFORE which to insert. +# ---------------------------------------------------------------------------- +def _ti(label: str, text: str, page: int = 0) -> "TextItem": + return TextItem(label=label, text=text, page=page) + + +def _find_after(items: list["TextItem"], pred, start: int = 0) -> int | None: + for i in range(start, len(items)): + if pred(items[i]): + return i + return None + + +def _missing_anchors_misra_cpp_2023(items: list["TextItem"]) -> list[tuple[int, list["TextItem"]]]: + """Return [(insert_before_index, synthetic_items)] for the 4 rules whose + headers are absent or merged with adjacent items in the docling output.""" + out: list[tuple[int, list[TextItem]]] = [] + + # Rule 0.0.1 — heading entirely missing in docling output. Body begins + # at the "Ampli2cation" section_header that immediately follows the + # "[misra]" text item that follows the "4.0.0 Path feasibility" header. + i_path = _find_after(items, lambda it: it.label == "section_header" + and it.text.strip() == "4.0.0 Path feasibility") + if i_path is not None: + i_misra = _find_after(items, lambda it: it.text.strip() == "[misra]", i_path + 1) + if i_misra is not None: + out.append((i_misra + 1, [ + _ti("section_header", + "Rule 0.0.1 A function shall not contain unreachable statements"), + _ti("text", "Category Required"), + _ti("text", "Analysis Decidable, Single Translation Unit"), + ])) + + # Rule 5.13.6 — heading and Category/Analysis are concatenated inside a + # single `code` item. Insert synthetic anchor immediately before that + # code item (located by a unique substring of the rule title). + i_5136 = _find_after(items, lambda it: it.label == "code" + and "Rule 5.13.6" in it.text and "long long" in it.text) + if i_5136 is not None: + out.append((i_5136, [ + _ti("section_header", + "Rule 5.13.6 An integer-literal of type long long shall not " + "use a single L or l in any suffix"), + _ti("text", "Category Required"), + _ti("text", "Analysis Decidable, Single Translation Unit"), + _ti("section_header", "Example"), + ])) + + # Rule 6.9.1 — heading concatenated into a `text` item ("...4.6.9 Types + # [basic.types] Rule 6.9.1 ..."). Insert synthetic anchor immediately + # before that item. + i_691 = _find_after(items, lambda it: it.label == "text" + and "Rule 6.9.1" in it.text + and "type aliases" in it.text) + if i_691 is not None: + out.append((i_691, [ + _ti("section_header", + "Rule 6.9.1 The same type aliases shall be used in all " + "declarations of the same entity"), + _ti("text", "Category Required"), + _ti("text", "Analysis Decidable, Single Translation Unit"), + _ti("section_header", "Amplification"), + ])) + + # Rule 15.0.2 — heading inside a `code` item ("struct NonEmptyDestructor + # ... Rule 15.0.2 User-provided copy and move ..."). Insert anchor + # immediately before it. + i_1502 = _find_after(items, lambda it: it.label == "code" + and "Rule 15.0.2" in it.text + and "User-provided copy and move" in it.text) + if i_1502 is not None: + out.append((i_1502, [ + _ti("section_header", + "Rule 15.0.2 User-provided copy and move member functions of " + "a class should have appropriate signatures"), + _ti("text", "Category Advisory"), + _ti("text", "Analysis Decidable, Single Translation Unit"), + ])) + + return out + + +_MISSING_ANCHOR_RESOLVERS = { + "MISRA-C++-2023": _missing_anchors_misra_cpp_2023, +} + + +def _splice_missing_anchors(items: list["TextItem"], standard: str) -> list["TextItem"]: + resolver = _MISSING_ANCHOR_RESOLVERS.get(standard) + if resolver is None: + return items + insertions = resolver(items) + if not insertions: + return items + # Apply from highest index to lowest so earlier indices stay valid. + insertions.sort(key=lambda x: x[0], reverse=True) + out = list(items) + for idx, syn in insertions: + out[idx:idx] = syn + return out + + +def extract_rules(pdf_path: Path, standard: str, cache_dir: Path) -> list[Rule]: + doc = load_docling_json(pdf_path, cache_dir) + items = _items(doc) + items = _splice_missing_anchors(items, standard) + starts = _find_rule_starts(items) + starts.append(len(items)) + rules: list[Rule] = [] + for a, b in zip(starts, starts[1:]): + rules.append(_build_rule(items, a, b, standard)) + return rules + + +# ---------------------------------------------------------------------------- +# Code-block line-break recovery +# ---------------------------------------------------------------------------- +# +# docling emits each PDF code block as a single joined string: the PDF's +# line breaks are collapsed to spaces, so examples would render as one +# long line. We cannot losslessly recover the original line breaks without +# re-reading layout boxes, but for C/C++ examples we can insert +# statement-level breaks at the obvious boundaries: `;`, `{`, `}`, and +# before `//` line comments. This is a deterministic, purely textual +# transform — no parsing or formatting — and keeps the output readable. + +_CODE_FORMAT_STEPS = [ + # Newline after `;` (but not inside `for( ; ; )` — the next rule catches + # runs of `;` we should leave alone). + (re.compile(r";\s+(?=\S)"), ";\n"), + # Newline after `{` (common block open) except for `${`-style literals. + (re.compile(r"\{\s+(?=\S)"), "{\n"), + # Newline before a `}` that is preceded by content on the same line. + (re.compile(r"(?<=\S)\s+\}"), "\n}"), +] + + +def _indent_by_braces(text: str) -> str: + """Add 2-space indentation based on brace nesting depth.""" + lines = text.splitlines() + out: list[str] = [] + depth = 0 + for line in lines: + stripped = line.strip() + if not stripped: + out.append("") + continue + # Dedent for lines that start with `}` + if stripped.startswith("}"): + depth = max(0, depth - 1) + out.append(" " * depth + stripped) + # Indent after lines that end with `{` + if stripped.endswith("{"): + depth += 1 + return "\n".join(out) + + +def _format_code_lines(text: str) -> str: + """Heuristically insert line breaks into a C/C++ code example that + docling concatenated onto a single line. Deterministic. + + Preserves existing multi-space alignment and inline ``//`` comments. + Only inserts line breaks at ``;``, ``{``, ``}`` boundaries and adds + brace-depth indentation. + """ + # Collapse runs of 3+ spaces (likely docling kerning artefacts) to + # a single space, but preserve 2-space runs which may be intentional + # alignment in column-style comments. + s = re.sub(r"[ \t]{3,}", " ", text).strip() + for pat, repl in _CODE_FORMAT_STEPS: + s = pat.sub(repl, s) + # Trim trailing whitespace on each line. + s = "\n".join(line.rstrip() for line in s.splitlines()).strip() + # Add indentation based on brace depth. + return _indent_by_braces(s) + + +# ---------------------------------------------------------------------------- +# Help-file rendering +# ---------------------------------------------------------------------------- + +STD_DISPLAY = { + "MISRA-C-2023": "MISRA C 2012", + "MISRA-C-2012": "MISRA C 2012", + "MISRA-C++-2023": "MISRA C++ 2023", +} + + +def render_help(rule: Rule, lang: str = "c") -> str: + rows = [f"Category{rule.category or 'Unknown'}"] + if rule.analysis: + rows.append(f"Analysis{rule.analysis}") + if rule.applies_to: + rows.append(f"Applies to{rule.applies_to}") + + parts: list[str] = [ + f"# {rule.raw_id}: {rule.title}", + "", + f"This query implements the {STD_DISPLAY.get(rule.standard, rule.standard)} {rule.raw_id}:", + "", + f"> {rule.title}", + "", + "## Classification", + "", + "", + *rows, + "
", + "", + ] + if rule.amplification: + parts += ["### Amplification", "", rule.amplification, ""] + if rule.rationale: + parts += ["### Rationale", "", rule.rationale, ""] + if rule.exceptions: + parts += ["### Exception", ""] + for e in rule.exceptions: + parts += [e, ""] + layout = getattr(rule, "_example_layout", None) + if layout: + parts += ["## Example", ""] + for kind, s in layout: + if kind == "code": + parts += [f"```{lang}", _format_code_lines(s), "```", ""] + else: + parts += [s, ""] + elif rule.example: + parts += ["## Example", "", f"```{lang}", + _format_code_lines(rule.example), "```", ""] + if rule.see_also: + parts += ["## See also", "", ", ".join(rule.see_also), ""] + parts += [ + "## Implementation notes", + "", + "None", + "", + "## References", + "", + f"* {STD_DISPLAY.get(rule.standard, rule.standard)}: {rule.raw_id}: {rule.title}", + "", + ] + return "\n".join(parts) + + +def to_dict(rule: Rule) -> dict: + return asdict(rule) + + +# ---------------------------------------------------------------------------- +# CLI +# ---------------------------------------------------------------------------- + +_REPO_ROOT = Path(__file__).resolve().parents[3] + +if __name__ == "__main__": + import argparse + ap = argparse.ArgumentParser() + ap.add_argument("pdf") + ap.add_argument("--standard", required=True, choices=list(STD_DISPLAY)) + ap.add_argument("--cache-dir", + default=str(_REPO_ROOT / "scripts" / "generate_rules" + / "misra_help" / "cache")) + ap.add_argument("--rule", action="append", help="only emit these rule IDs") + ap.add_argument("--json", default=None, + help="write extracted rules to this JSON file") + args = ap.parse_args() + rules = extract_rules(Path(args.pdf), args.standard, Path(args.cache_dir)) + selected = [r for r in rules if not args.rule or r.rule_id in args.rule] + print(f"Extracted {len(rules)} rules from {args.pdf}" + f" ({len(selected)} selected)") + if args.json: + out = Path(args.json) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text( + json.dumps([to_dict(r) for r in selected], indent=2), + encoding="utf-8", + ) + print(f"Wrote {out}") diff --git a/scripts/generate_rules/misra_help/harness.py b/scripts/generate_rules/misra_help/harness.py new file mode 100644 index 0000000000..d65ae9678d --- /dev/null +++ b/scripts/generate_rules/misra_help/harness.py @@ -0,0 +1,169 @@ +"""Determinism harness for the MISRA help generator. + +Runs the docling → extract → render pipeline `N` times and reports per-rule, +per-section variance. Intended workflow: + + python harness.py --pdf --standard -n 5 + +For each iteration: + - clears the docling JSON cache (so docling re-runs end-to-end) + - extracts every rule + - hashes every section field per rule + - hashes the full rendered .md per rule + - records all hashes + +After N iterations, emits a JSON report and a brief summary: + - per-section: count of rules where ALL N runs agreed + - per-rule: list of sections that diverged + - hash table sizes per rule (1 == deterministic, >1 == flaky) + +This intentionally focuses on *output variance*, not on backend variance: +the goal is "given this codebase, are the rendered help files reproducible?" +""" +from __future__ import annotations +import argparse +import hashlib +import json +import os +import sys +import time +from collections import Counter, defaultdict +from dataclasses import asdict +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from extract_rules import extract_rules, render_help, to_dict, STD_DISPLAY # noqa: E402 + +SECTIONS = ( + "category", "analysis", "applies_to", + "amplification", "rationale", "exceptions", + "example", "see_also", + "_rendered", # the full .md output +) + + +def _hash(value) -> str: + if isinstance(value, list): + s = "\n\u241e\n".join(value) + else: + s = str(value) + return hashlib.sha256(s.encode("utf-8")).hexdigest()[:16] + + +def run_once(pdf: Path, standard: str, cache_dir: Path, lang: str) -> dict[str, dict[str, str]]: + """Return rule_id -> {section: hash}.""" + rules = extract_rules(pdf, standard, cache_dir) + out: dict[str, dict[str, str]] = {} + for r in rules: + d = to_dict(r) + rendered = render_help(r, lang) + hashes = {} + for sec in SECTIONS: + if sec == "_rendered": + hashes[sec] = _hash(rendered) + else: + hashes[sec] = _hash(d.get(sec, "")) + out[r.rule_id] = hashes + return out + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--pdf", required=True) + ap.add_argument("--standard", required=True, choices=list(STD_DISPLAY)) + ap.add_argument("-n", "--iterations", type=int, default=3) + ap.add_argument("--cache-dir", + default=str(Path(__file__).resolve().parent / "cache")) + ap.add_argument("--keep-cache", action="store_true", + help="do NOT clear docling cache between runs (tests just the post-docling stages)") + ap.add_argument("--report", default="/tmp/misra-pdf-probe/determinism-report.json") + args = ap.parse_args() + + cache = Path(args.cache_dir) + cache.mkdir(parents=True, exist_ok=True) + + all_runs: list[dict[str, dict[str, str]]] = [] + timings: list[float] = [] + for i in range(args.iterations): + if not args.keep_cache: + for f in cache.glob("*.docling.json"): + f.unlink() + t0 = time.time() + run = run_once(Path(args.pdf), args.standard, cache, + "cpp" if "C++" in args.standard else "c") + timings.append(time.time() - t0) + print(f" iter {i+1}/{args.iterations}: {len(run)} rules, {timings[-1]:.1f}s") + all_runs.append(run) + + # Aggregate. + rule_ids = sorted({rid for run in all_runs for rid in run.keys()}) + rules_in_all_runs = [r for r in rule_ids if all(r in run for run in all_runs)] + rules_missing_in_some = [r for r in rule_ids if r not in rules_in_all_runs] + + section_pass: Counter[str] = Counter() + section_total: Counter[str] = Counter() + rule_diverged: dict[str, list[str]] = defaultdict(list) + rule_hashes: dict[str, dict[str, list[str]]] = {} + + for rid in rules_in_all_runs: + per_sec: dict[str, list[str]] = {} + for sec in SECTIONS: + hs = [run[rid][sec] for run in all_runs] + per_sec[sec] = hs + section_total[sec] += 1 + if len(set(hs)) == 1: + section_pass[sec] += 1 + else: + rule_diverged[rid].append(sec) + rule_hashes[rid] = per_sec + + summary = { + "iterations": args.iterations, + "pdf": args.pdf, + "standard": args.standard, + "rule_count_per_iter": [len(run) for run in all_runs], + "rules_in_all_runs": len(rules_in_all_runs), + "rules_missing_in_some_runs": rules_missing_in_some, + "rule_count_stable": len(set(len(run) for run in all_runs)) == 1, + "section_determinism": { + sec: { + "stable": section_pass[sec], + "total": section_total[sec], + "pct": (100.0 * section_pass[sec] / section_total[sec]) if section_total[sec] else 0.0, + } + for sec in SECTIONS + }, + "rules_with_divergence": [ + {"rule_id": rid, "diverging_sections": secs} for rid, secs in sorted(rule_diverged.items()) + ], + "iteration_seconds": timings, + } + + Path(args.report).write_text(json.dumps( + {"summary": summary, "rule_hashes": rule_hashes}, + indent=2, + ), encoding="utf-8") + + print("\n=== Determinism summary ===") + print(f" iterations: {args.iterations}") + print(f" pdf: {args.pdf}") + print(f" rule count/iter: {summary['rule_count_per_iter']}") + print(f" rules in all runs: {summary['rules_in_all_runs']}") + if rules_missing_in_some: + print(f" rules missing in some: {rules_missing_in_some[:10]} ...") + print(f" per-section stability:") + for sec, s in summary["section_determinism"].items(): + bar = "#" * int(s["pct"] / 5) + print(f" {sec:14s} {s['stable']:>4d}/{s['total']:<4d} {s['pct']:6.2f}% {bar}") + print(f" rules with any divergence: {len(rule_diverged)}") + if rule_diverged: + sample = list(rule_diverged.items())[:5] + for rid, secs in sample: + print(f" {rid}: {secs}") + print(f" per-iteration time: {[f'{t:.1f}s' for t in timings]}") + print(f" full report: {args.report}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/generate_rules/misra_help/populate_help.py b/scripts/generate_rules/misra_help/populate_help.py new file mode 100644 index 0000000000..d331534f48 --- /dev/null +++ b/scripts/generate_rules/misra_help/populate_help.py @@ -0,0 +1,328 @@ +"""Populate `codeql-coding-standards-help/{c,cpp}/misra/src/rules/...` from the +two MISRA PDFs that the user supplies (the PDFs are gitignored / not shipped). + +For each `.ql` query under `/{c,cpp}/misra/src/rules/RULE-X-Y[-Z]/.ql`, +this writes `/{c,cpp}/misra/src/rules/RULE-X-Y[-Z]/.md` using +content extracted by `extract_rules.py` (deterministic, docling-based). + +Behaviour: + - existing .md files are NEVER overwritten unless --overwrite is passed + - missing rule_ids are reported but do not abort + - dry-run mode (--dry-run) prints what would be written + +Standards covered: + - MISRA-C-2023 (C queries) ← extracted from MISRA-C PDF + - MISRA-C-2012 (C queries) ← extracted from same MISRA-C PDF (rule + numbering is largely shared); consult + rules.csv for the rule list + - MISRA-C++-2023 (C++ queries) ← extracted from MISRA-C++ PDF +""" +from __future__ import annotations +import argparse +import os +import re +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from extract_rules import extract_rules, render_help, Rule # noqa: E402 + +DEFAULT_HELP_REPO = Path(__file__).resolve().parents[3].parent / "codeql-coding-standards-help" +DEFAULT_QUERY_REPO = Path(__file__).resolve().parents[3] +DEFAULT_CACHE_DIR = Path(__file__).resolve().parent / "cache" + +# standard → (lang, relative source dir under the queries repo). +# A MISRA standard implies its language; users do not pass --lang. +STANDARD_INFO: dict[str, tuple[str, Path]] = { + "MISRA-C-2023": ("c", Path("c/misra/src/rules")), + "MISRA-C-2012": ("c", Path("c/misra/src/rules")), + "MISRA-C++-2023": ("cpp", Path("cpp/misra/src/rules")), +} + +SUPPORTED_STANDARDS = sorted(STANDARD_INFO) + +# Each MISRA standard ships as a single licensed PDF whose filename includes a +# per-licensee suffix (e.g. "MISRA-C-2023-XXXXXX.pdf"). We do not hard-code the +# filename. The PDF location is resolved in this order: +# +# 1. --pdf CLI flag +# 2. environment variable named in PDF_ENV_VARS for the standard +# 3. a glob of PDF_FILE_GLOBS within --help-repo +# +# If none of those resolve to exactly one file, we abort with a clear message. +PDF_ENV_VARS = { + "MISRA-C-2023": "MISRA_C_PDF", + "MISRA-C-2012": "MISRA_C_PDF", + "MISRA-C++-2023": "MISRA_CPP_PDF", +} +PDF_FILE_GLOBS = { + "MISRA-C-2023": ["MISRA-C-2023*.pdf", "MISRA-C-2012*.pdf"], + "MISRA-C-2012": ["MISRA-C-2023*.pdf", "MISRA-C-2012*.pdf"], + "MISRA-C++-2023": ["MISRA-CPP-2023*.pdf", "MISRA-C++-2023*.pdf"], +} + +RULE_DIR_RE = re.compile(r"^(?:RULE|DIR)-\d+(?:-\d+){1,2}$") +QL_NAME_RE = re.compile(r"@name\s+(?:RULE|DIR)-\d+(?:-\d+){1,2}:\s+(?P.+?)\s*$") + + +def _normalize_title(s: str) -> str: + """Canonicalize a rule title for equality comparison. + + Titles in the MISRA PDFs routinely carry trailing annotations that + the `.ql` @name does not replicate — standards-body references + (`C90 [Undefined 12, 39, 40]`), bracketed cross-reference tags + (`[dcl.enum]`, `[class.bit] / 3, 4`), and implementation notes + (`Implementation 1.2, 1.10`) — so we strip those before comparing. + We also normalize whitespace, curly quotes, dashes, and typographic + spaces. + """ + # Normalize curly quotes / dashes / non-breaking spaces first. + trans = str.maketrans({ + "\u2019": "'", "\u2018": "'", + "\u201c": '"', "\u201d": '"', + "\u2013": "-", "\u2014": "-", + "\u00a0": " ", + }) + s = s.translate(trans) + # Collapse whitespace. + s = re.sub(r"\s+", " ", s).strip() + # Strip a leading "Rule X.Y[.Z] " or "Dir X.Y " duplicated prefix that + # docling sometimes injects into the section-header text itself. + s = re.sub(r"^(?:Rule|Dir)\s+\d+(?:\.\d+){1,2}\s+", "", s) + # PDF extraction leaves spaces before commas/semicolons where the + # layout used kerning around punctuation ("virtual , override"). + s = re.sub(r"\s+([,;])", r"\1", s) + # Drop trailing references of the form "C90 [...]" / "C99 [...]" etc. + s = re.sub( + r"\s+(?:C90|C99|C11|C17|C18)\s*\[[^\]]*\]" + r"(?:[,;\s]+(?:C90|C99|C11|C17|C18)\s*\[[^\]]*\])*\s*$", + "", + s, + ) + # Iteratively strip trailing bracketed annotations and their tails. + # Handles: `[ns.anchor]`, `[ns.anchor] / 2`, `[ns.anchor] Undefined 5`, + # `[Koenig] 78-81`, `[C11] / 7.22.1; Undefined 1`, chains of these. + trailing = re.compile( + r"\s*\[[^\]]*\]" # a [...] group + r"(?:\s*/?\s*[\w.,;\s()*+-]*?)?" # optional tail + r"\s*$" + ) + impl = re.compile( + r"\s*(?:Implementation|Undefined|Unspecified)" + r"\s+[\w.,;\s()*+-]+$", + re.IGNORECASE, + ) + for _ in range(5): + before = s + s = trailing.sub("", s).strip() + s = impl.sub("", s).strip() + if s == before: + break + s = s.lower() + # Strip single/double quotes entirely — MISRA quotes individual + # tokens like "'commented out'" inconsistently between the PDF and + # the .ql `@name`. + s = re.sub(r"[\"']", "", s) + return s.rstrip(" .,;:") + + +def _titles_match(ql_title: str, pdf_title: str) -> bool: + """Return True if the `.ql` `@name` title and the PDF-extracted rule + title describe the same rule. + + We accept: + * exact normalized equality; + * the `.ql` title being a prefix of the PDF title (the `.ql` + `@name` line is sometimes truncated before the help generator + wraps onto the `@description` line); + * the `.ql` title being contained in the PDF title, when it is + sufficiently long that an accidental substring match is + implausible (≥ 40 normalized chars). Multiple queries per rule + often carry query-specific titles that appear verbatim inside + the rule's full statement. + """ + a = _normalize_title(ql_title) + b = _normalize_title(pdf_title) + if not a or not b: + return False + if a == b: + return True + if b.startswith(a) or a.startswith(b): + return True + if len(a) >= 40 and a in b: + return True + return False + + +def _read_ql_name(ql_path: Path) -> str | None: + """Return the human-readable rule title from a `.ql` file's `@name` + metadata, or None if not found.""" + try: + with ql_path.open(encoding="utf-8") as f: + for line in f: + m = QL_NAME_RE.search(line) + if m: + return m.group("title") + if line.strip().startswith("import "): + break + except OSError: + return None + return None + + +def resolve_pdf(standard: str, cli_pdf: Path | None, help_repo: Path) -> Path: + """Locate the licensed PDF for a standard. Raises with a helpful message.""" + if cli_pdf is not None: + if not cli_pdf.is_file(): + raise SystemExit(f"error: --pdf {cli_pdf} does not exist") + return cli_pdf + env_var = PDF_ENV_VARS[standard] + env_val = os.environ.get(env_var) + if env_val: + p = Path(env_val).expanduser() + if not p.is_file(): + raise SystemExit( + f"error: ${env_var} is set to {p} which does not exist") + return p + matches: list[Path] = [] + for pattern in PDF_FILE_GLOBS[standard]: + matches.extend(sorted(help_repo.glob(pattern))) + if len(matches) == 1: + return matches[0] + if not matches: + raise SystemExit( + f"error: cannot locate the {standard} PDF.\n" + f" Provide it via --pdf <path>, or set ${env_var}, or place a\n" + f" file matching one of {PDF_FILE_GLOBS[standard]} in {help_repo}.") + raise SystemExit( + f"error: multiple candidate PDFs for {standard} found in {help_repo}:\n" + + "\n".join(f" {m}" for m in matches) + + f"\n Disambiguate with --pdf <path> or ${env_var}.") + + +def collect_queries(query_repo: Path, standard: str) -> dict[str, list[Path]]: + """rule_id -> list of query file paths.""" + _, src_rel = STANDARD_INFO[standard] + src_dir = query_repo / src_rel + out: dict[str, list[Path]] = {} + if not src_dir.is_dir(): + return out + for ql in src_dir.rglob("*.ql"): + rule_dir = ql.parent.name + if not RULE_DIR_RE.match(rule_dir): + continue + out.setdefault(rule_dir, []).append(ql) + return out + + +def write_help(rule: Rule, ql_path: Path, lang: str, help_repo: Path, + query_repo: Path, lang_src: Path, + no_overwrite: bool, dry_run: bool, + rule_trusted: bool) -> str: + """Write one help .md and return a short status string.""" + rel_dir = ql_path.parent.relative_to(query_repo / lang_src) + target_dir = help_repo / lang_src / rel_dir + target = target_dir / (ql_path.stem + ".md") + rel = target.relative_to(help_repo) + + if not rule_trusted: + ql_title = _read_ql_name(ql_path) or "" + return (f"title-mismatch {rel} " + f"(ql={ql_title!r} pdf={rule.title!r})") + + body = render_help(rule, lang) + if target.exists(): + if no_overwrite: + return f"skip-existing {rel}" + if target.read_text(encoding="utf-8") == body: + return f"unchanged {rel}" + action = "wrote-changed" + else: + action = "wrote-new" + if dry_run: + return f"would-{action} {rel} ({len(body)} bytes)" + target_dir.mkdir(parents=True, exist_ok=True) + target.write_text(body, encoding="utf-8") + return f"{action} {rel} ({len(body)} bytes)" + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--standard", required=True, choices=SUPPORTED_STANDARDS, + help="MISRA standard to populate (the source language is " + "derived from this)") + ap.add_argument("--query-repo", type=Path, default=DEFAULT_QUERY_REPO, + help="path to codeql-coding-standards repo (default: this repo)") + ap.add_argument("--help-repo", type=Path, default=DEFAULT_HELP_REPO, + help="path to codeql-coding-standards-help repo") + ap.add_argument("--pdf", type=Path, default=None, + help="path to the licensed MISRA PDF (overrides env var " + "and help-repo glob)") + ap.add_argument("--cache-dir", type=Path, + default=DEFAULT_CACHE_DIR, + help="docling JSON cache dir (deterministic across runs)") + ap.add_argument("--rule", action="append", default=[], + help="restrict to specific RULE-X-Y[-Z] (repeatable)") + ap.add_argument("--no-overwrite", action="store_true", + help="leave existing .md files untouched (default: " + "regenerate every help file from the rule " + "description so help content is reproducible)") + ap.add_argument("--ignore-title-mismatch", action="store_true", + help="regenerate even when the .ql @name title differs " + "from the PDF-extracted title (by default we skip " + "such files to avoid overwriting correct content " + "with content from a renumbered rule or a broken " + "PDF anchor)") + ap.add_argument("--dry-run", action="store_true", + help="report what would be written without writing") + args = ap.parse_args() + + pdf = resolve_pdf(args.standard, args.pdf, args.help_repo) + args.cache_dir.mkdir(parents=True, exist_ok=True) + rules = extract_rules(pdf, args.standard, args.cache_dir) + by_id = {r.rule_id: r for r in rules} + + lang, lang_src = STANDARD_INFO[args.standard] + queries = collect_queries(args.query_repo, args.standard) + rule_filter = set(s.upper() for s in args.rule) + counts: dict[str, int] = {} + for rule_id in sorted(queries): + if rule_filter and rule_id not in rule_filter: + continue + rule = by_id.get(rule_id) + if rule is None: + print(f"missing-rule {rule_id} (no PDF entry)") + counts["missing-rule"] = counts.get("missing-rule", 0) + 1 + continue + # Verify the rule's identity via the `.ql` `@name` titles. The + # rule is "trusted" for this directory if any one query's title + # matches the PDF title; that way narrow per-query titles do + # not block regeneration when the rule as a whole is correctly + # identified. + if args.ignore_title_mismatch: + rule_trusted = True + else: + rule_trusted = False + for ql in queries[rule_id]: + ql_title = _read_ql_name(ql) or "" + if _titles_match(ql_title, rule.title): + rule_trusted = True + break + for ql in sorted(queries[rule_id]): + status = write_help(rule, ql, lang, args.help_repo, + args.query_repo, lang_src, + args.no_overwrite, args.dry_run, + rule_trusted) + print(status) + kind = status.split()[0] + counts[kind] = counts.get(kind, 0) + 1 + + print("\nSummary:") + for k in sorted(counts): + print(f" {k}: {counts[k]}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/generate_rules/misra_help/refresh_help.py b/scripts/generate_rules/misra_help/refresh_help.py new file mode 100644 index 0000000000..3d953d1b0a --- /dev/null +++ b/scripts/generate_rules/misra_help/refresh_help.py @@ -0,0 +1,198 @@ +"""Re-generate query help files in two stages without needing docling. + +This script reuses the existing .misra-rule-cache/<standard>.json +(produced by a prior dump_rules_json.py run) to: + + Stage 1: Deterministically re-render every .md from the cached rule + data via render_help(). + Patch: Update the cache JSON with current existing_md content and + implementation_scope from rule_packages/*.json. + Stage 2: Run rewrite_help.py (LLM lint/proofread) over the patched + cache. + +Usage: + python refresh_help.py --standard MISRA-C-2012 + python refresh_help.py --standard MISRA-C++-2023 + python refresh_help.py --standard MISRA-C-2012 --stage1-only +""" +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +sys.path.insert(0, str(Path(__file__).parent)) +from extract_rules import Rule, render_help, _format_code_lines # noqa: E402 +from cache import load_cache as _load_cache, save_cache # noqa: E402 + +SCRIPT_DIR = Path(__file__).resolve().parent +QUERY_REPO = SCRIPT_DIR.parents[2] +DEFAULT_HELP_REPO = QUERY_REPO.parent / "codeql-coding-standards-help" + +STANDARD_INFO = { + "MISRA-C-2012": ("c", "c/misra/src/rules"), + "MISRA-C-2023": ("c", "c/misra/src/rules"), + "MISRA-C++-2023": ("cpp", "cpp/misra/src/rules"), +} + + +def _rule_from_json(d: dict[str, Any]) -> Rule: + """Reconstruct a Rule from the cache JSON dict.""" + r = Rule( + rule_id=d["rule_id"], + raw_id=d["raw_id"], + standard=d["standard"], + title=d["title"], + category=d.get("category", ""), + analysis=d.get("analysis", ""), + applies_to=d.get("applies_to", ""), + amplification=d.get("amplification", ""), + rationale=d.get("rationale", ""), + exceptions=d.get("exceptions", []), + example=d.get("example", ""), + see_also=d.get("see_also", []), + ) + # Restore example_layout if present. + layout = d.get("example_layout", []) + if layout: + r._example_layout = [(item["kind"], item["text"]) for item in layout] + return r + + +def _load_impl_scope_lookup( + query_repo: Path, standard: str, +) -> dict[tuple[str, str], dict]: + """Build (rule_id, short_name) -> implementation_scope from rule_packages.""" + lang, _ = STANDARD_INFO[standard] + pkg_dir = query_repo / "rule_packages" / lang + if not pkg_dir.is_dir(): + return {} + lookup: dict[tuple[str, str], dict] = {} + for pkg_file in sorted(pkg_dir.glob("*.json")): + try: + data = json.loads(pkg_file.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + continue + for _std_key, rules in data.items(): + if not isinstance(rules, dict): + continue + for rule_id, rule_data in rules.items(): + if not isinstance(rule_data, dict): + continue + for q in rule_data.get("queries", []): + sn = q.get("short_name") + impl = q.get("implementation_scope") + if sn and impl: + lookup[(rule_id, sn)] = impl + return lookup + + +def stage1_render(cache: dict, help_repo: Path) -> tuple[int, int]: + """Re-render all .md files from cached rule data. Returns (wrote, skipped).""" + lang = cache["lang"] + rules_json = cache["rules"] + queries_json = cache["queries"] + + wrote = skipped = 0 + for rule_id, query_list in sorted(queries_json.items()): + rule_data = rules_json.get(rule_id) + if not rule_data: + skipped += len(query_list) + continue + rule = _rule_from_json(rule_data) + body = render_help(rule, lang) + for q in query_list: + md_path = help_repo / q["md_path"] + md_path.parent.mkdir(parents=True, exist_ok=True) + md_path.write_text(body, encoding="utf-8") + wrote += 1 + + return wrote, skipped + + +def patch_cache( + cache: dict, help_repo: Path, query_repo: Path, standard: str, +) -> dict: + """Update existing_md and add implementation_scope to the cache.""" + impl_lookup = _load_impl_scope_lookup(query_repo, standard) + queries_json = cache["queries"] + + for rule_id, query_list in queries_json.items(): + for q in query_list: + md_path = help_repo / q["md_path"] + try: + q["existing_md"] = md_path.read_text(encoding="utf-8") + except FileNotFoundError: + q["existing_md"] = None + + # Add implementation_scope from rule_packages. + ql_stem = Path(q["ql_path"]).stem + impl = impl_lookup.get((rule_id, ql_stem)) + if impl: + q["implementation_scope"] = impl + elif "implementation_scope" in q: + del q["implementation_scope"] + + return cache + + +def main() -> int: + p = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--standard", required=True, choices=sorted(STANDARD_INFO)) + p.add_argument("--help-repo", type=Path, default=DEFAULT_HELP_REPO) + p.add_argument("--query-repo", type=Path, default=QUERY_REPO) + p.add_argument("--stage1-only", action="store_true", + help="Only run deterministic stage 1 (no LLM).") + p.add_argument("--model", default=None, + help="Copilot model id for stage 2.") + args = p.parse_args() + + help_repo = args.help_repo.resolve() + try: + cache = _load_cache(help_repo, args.standard) + except FileNotFoundError as e: + print(str(e), file=sys.stderr) + return 2 + total_queries = sum(len(v) for v in cache["queries"].values()) + print(f"Loaded cache: {len(cache['rules'])} rules, {total_queries} queries") + + # Stage 1: deterministic render. + print("\n=== Stage 1: deterministic render ===") + wrote, skipped = stage1_render(cache, help_repo) + print(f"Stage 1 done: wrote={wrote} skipped={skipped}") + + # Patch cache with fresh existing_md + implementation_scope. + print("\n=== Patching cache ===") + cache = patch_cache(cache, help_repo, args.query_repo, args.standard) + save_cache(help_repo, args.standard, cache) + impl_count = sum( + 1 for qs in cache["queries"].values() + for q in qs if q.get("implementation_scope") + ) + print(f"Cache updated: implementation_scope on {impl_count} queries") + + if args.stage1_only: + print("\n--stage1-only: skipping LLM pass.") + return 0 + + # Stage 2: LLM lint/proofread via rewrite_help.py. + print("\n=== Stage 2: LLM lint/proofread ===") + cmd = [ + sys.executable, + str(SCRIPT_DIR / "rewrite_help.py"), + "--standard", args.standard, + "--help-repo", str(help_repo), + ] + if args.model: + cmd += ["--model", args.model] + print(f"Running: {' '.join(cmd)}") + return subprocess.call(cmd) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/generate_rules/misra_help/rewrite_help.py b/scripts/generate_rules/misra_help/rewrite_help.py new file mode 100644 index 0000000000..e68a251593 --- /dev/null +++ b/scripts/generate_rules/misra_help/rewrite_help.py @@ -0,0 +1,523 @@ +"""Rewrite MISRA help (.md) files using GitHub Copilot as a second pass. + +The deterministic Python pipeline (`extract_rules.py` + `populate_help.py`) +extracts each rule from the licensed MISRA PDFs into Markdown plus a +structured JSON sidecar (via `dump_rules_json.py`). This script reads +that JSON and asks GitHub Copilot to render an idiomatic, well-formatted +help file for every query that targets the rule. + +This is a true headless driver: it talks directly to the Copilot chat +completions endpoint (`https://api.githubcopilot.com/chat/completions`) +using the OAuth token that the official Copilot extensions store on +disk. No VS Code, no extension required. + +Token discovery order: +1. Environment variable `GH_COPILOT_OAUTH_TOKEN`. +2. `~/.config/github-copilot/apps.json` (current Copilot). +3. `~/.config/github-copilot/hosts.json` (legacy Copilot). + +The OAuth token is exchanged for a short-lived Copilot API token via +`https://api.github.com/copilot_internal/v2/token` and refreshed +automatically before expiry. + +Usage: + python rewrite_help.py --standard MISRA-C-2012 + python rewrite_help.py --standard MISRA-C++-2023 --rule RULE-6-7-1 + python rewrite_help.py --standard MISRA-C-2012 --limit 5 --dry-run +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Iterable + +import requests + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + + +SUPPORTED_STANDARDS = ("MISRA-C-2012", "MISRA-C-2023", "MISRA-C++-2023") +STD_DISPLAY = { + "MISRA-C-2012": "MISRA C 2012", + "MISRA-C-2023": "MISRA C 2012", + "MISRA-C++-2023": "MISRA C++ 2023", +} + +DEFAULT_HELP_REPO = ( + Path(__file__).resolve().parents[3].parent / "codeql-coding-standards-help" +) + +COPILOT_TOKEN_URL = "https://api.github.com/copilot_internal/v2/token" +COPILOT_CHAT_URL = "https://api.githubcopilot.com/chat/completions" + +# Headers required by the Copilot backend. The editor identification +# strings mirror what a real editor sends; the Copilot service rejects +# requests without them. +EDITOR_VERSION = "vscode/1.99.0" +EDITOR_PLUGIN = "copilot-chat/0.20.0" +COPILOT_INTEGRATION_ID = "vscode-chat" +USER_AGENT = "GitHubCopilotChat/0.20.0" + +DEFAULT_MODEL = "claude-sonnet-4" +MODEL_FALLBACKS = ("claude-sonnet-4", "claude-3.7-sonnet", "gpt-4o", "gpt-4") + + +# --------------------------------------------------------------------------- +# Token handling +# --------------------------------------------------------------------------- + + +def _read_oauth_token_from_apps(path: Path) -> str | None: + """Read OAuth token from the current `apps.json` Copilot store.""" + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return None + # apps.json maps "github.com:<client_id>" -> {"oauth_token": "..."}. + for entry in data.values(): + token = entry.get("oauth_token") if isinstance(entry, dict) else None + if token: + return token + return None + + +def _read_oauth_token_from_hosts(path: Path) -> str | None: + """Read OAuth token from the legacy `hosts.json` Copilot store.""" + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return None + entry = data.get("github.com") + if isinstance(entry, dict): + token = entry.get("oauth_token") + if token: + return token + return None + + +def discover_oauth_token() -> str: + """Find a Copilot OAuth token on this machine.""" + env = os.environ.get("GH_COPILOT_OAUTH_TOKEN") + if env: + return env.strip() + base = Path.home() / ".config" / "github-copilot" + candidates = [ + ("apps.json", _read_oauth_token_from_apps), + ("hosts.json", _read_oauth_token_from_hosts), + ] + for name, reader in candidates: + token = reader(base / name) + if token: + return token + raise RuntimeError( + "No Copilot OAuth token found. Either set GH_COPILOT_OAUTH_TOKEN, " + "or sign in to GitHub Copilot in VS Code / the gh CLI so that " + f"{base}/apps.json or hosts.json exists." + ) + + +@dataclass +class CopilotToken: + token: str + expires_at: int # unix seconds + + def near_expiry(self, slack_seconds: int = 300) -> bool: + return time.time() + slack_seconds >= self.expires_at + + +def fetch_copilot_token(oauth_token: str) -> CopilotToken: + """Exchange a GitHub OAuth token for a short-lived Copilot API token.""" + resp = requests.get( + COPILOT_TOKEN_URL, + headers={ + "Authorization": f"token {oauth_token}", + "Editor-Version": EDITOR_VERSION, + "Editor-Plugin-Version": EDITOR_PLUGIN, + "User-Agent": USER_AGENT, + "Accept": "application/json", + }, + timeout=30, + ) + if resp.status_code != 200: + raise RuntimeError( + f"Copilot token exchange failed: HTTP {resp.status_code} {resp.text[:200]}" + ) + body = resp.json() + return CopilotToken(token=body["token"], expires_at=int(body["expires_at"])) + + +class CopilotSession: + """Holds the OAuth token and the current short-lived API token.""" + + def __init__(self, oauth_token: str) -> None: + self._oauth = oauth_token + self._tok: CopilotToken | None = None + + def token(self) -> str: + if self._tok is None or self._tok.near_expiry(): + self._tok = fetch_copilot_token(self._oauth) + return self._tok.token + + def chat( + self, + messages: list[dict[str, str]], + model: str, + temperature: float = 0.0, + max_tokens: int = 4096, + ) -> str: + """Call chat completions and return the assistant message text.""" + last_err: Exception | None = None + for attempt in range(3): + headers = { + "Authorization": f"Bearer {self.token()}", + "Editor-Version": EDITOR_VERSION, + "Editor-Plugin-Version": EDITOR_PLUGIN, + "Copilot-Integration-Id": COPILOT_INTEGRATION_ID, + "User-Agent": USER_AGENT, + "Content-Type": "application/json", + "Accept": "application/json", + } + payload = { + "model": model, + "messages": messages, + "temperature": temperature, + "max_tokens": max_tokens, + "stream": False, + "n": 1, + } + try: + resp = requests.post( + COPILOT_CHAT_URL, + headers=headers, + json=payload, + timeout=180, + ) + except requests.RequestException as exc: + last_err = exc + time.sleep(2 ** attempt) + continue + if resp.status_code == 401: + # Token may have expired between the near-expiry check + # and the request. Force a refresh and retry once. + self._tok = None + last_err = RuntimeError(f"401: {resp.text[:200]}") + continue + if resp.status_code == 429 or 500 <= resp.status_code < 600: + last_err = RuntimeError( + f"HTTP {resp.status_code}: {resp.text[:200]}" + ) + time.sleep(2 ** attempt) + continue + if resp.status_code != 200: + raise RuntimeError( + f"Copilot chat failed: HTTP {resp.status_code} {resp.text[:500]}" + ) + data = resp.json() + return data["choices"][0]["message"]["content"] + raise RuntimeError(f"Copilot chat failed after retries: {last_err}") + + +# --------------------------------------------------------------------------- +# Prompt construction (mirrors codeql-coding-standards-agent/src/rewriteHelp.ts) +# --------------------------------------------------------------------------- + + +def system_prompt() -> str: + return "\n".join([ + "You are a documentation linter, formatter, and proofreader for" + " MISRA query help files (Markdown).", + "", + "You are NOT an author. Your job is to take an existing query" + " help file and apply ONLY the transformations listed below." + " The input document was generated deterministically from the" + " licensed MISRA rule text and should be preserved as-is except" + " for the specific fixes you are instructed to make.", + "", + "ALLOWED changes (apply all that are applicable):", + "", + "1. American English: convert British spellings throughout all" + " prose (NOT code, identifiers, or text inside `code spans`)." + " Common conversions: behaviour->behavior," + " initialise->initialize, initialised->initialized," + " initialisation->initialization, recognise->recognize," + " organisation->organization, optimise->optimize," + " analyse->analyze, modelling->modeling," + " signalling->signaling, programme->program," + " centre->center, colour->color, defence->defense," + " licence (noun)->license, judgement->judgment," + " fulfil->fulfill, whilst->while, amongst->among," + " learnt->learned, spelt->spelled, catalogue->catalog," + " dialogue->dialog, artefact->artifact.", + "", + "2. PDF extraction artifacts:", + " - Strip footnote references: \"C90 [Undefined 12]\"," + " \"C99 [...]\", \"C11 [...]\", \"C17 [...]\".", + " - Strip bracketed cross-reference tags:" + " \"[dcl.enum]\", \"[class.bit]\".", + " - Collapse multi-space kerning runs" + " (\"If any element\" -> \"If any element\").", + " - Fix stray spaces before punctuation" + " (\"virtual , override\" -> \"virtual, override\").", + " - Replace curly quotes with straight quotes.", + "", + "3. Markdown formatting (fix only if broken):", + " - Code blocks must use the correct language tag" + " (```c or ```cpp).", + " - Numbered exceptions must use \"1.\", \"2.\", \"3.\"" + " format, never bullets.", + "", + "4. Heading title: the \"# <Rule|Dir> X.Y[.Z]: <title>\"" + " heading must use the title from the .ql @name metadata" + " (provided in the input as ql_name_title), which is the" + " authoritative short title.", + "", + "5. Implementation notes: if IMPLEMENTATION_SCOPE text is" + " provided in the input, use it verbatim in the" + " \"## Implementation notes\" section. Otherwise, leave" + " the section as \"None\". Never invent implementation" + " notes.", + "", + "6. Structure: verify the document follows this section" + " order. Fix ordering if wrong, but do NOT add sections" + " that have no content in the input:", + " - # <Rule|Dir> X.Y[.Z]: <title>", + " - \"This query implements ...\" + blockquote", + " - ## Classification (HTML table)", + " - ### Amplification (if content exists)", + " - ### Rationale (if content exists)", + " - ### Exception (if content exists)", + " - ## Example (if content exists)", + " - ## See also (if content exists)", + " - ## Implementation notes", + " - ## References", + "", + "FORBIDDEN (do NOT do any of these):", + "- Do NOT paraphrase, summarize, or rewrite the rule text" + " in your own words.", + "- Do NOT add explanatory text, examples, or content not" + " present in the input.", + "- Do NOT remove content that is present in the input" + " (unless it is a PDF artifact listed above).", + "- Do NOT change technical meaning, even subtly.", + "- Do NOT modify code inside fenced code blocks." + " Preserve indentation, brace placement, comment" + " positions, and alignment exactly as given.", + "- Do NOT change brace placement style (e.g. Allman to" + " K&R or vice versa).", + "- Do NOT merge separate fenced code blocks into one or" + " convert prose paragraphs between code blocks into" + " code comments.", + "- Do NOT wrap the entire output in a fenced code block.", + "", + "Output ONLY the corrected Markdown file content." + " No commentary before or after." + " End with exactly one trailing newline.", + ]) + + +def user_prompt(rule: dict[str, Any], query: dict[str, Any], standard: str) -> str: + existing = query.get("existing_md") + impl_scope = query.get("implementation_scope") + + parts: list[str] = [] + + if existing: + parts += [ + "Lint, format, and proofread the following query help file.", + "Apply ONLY the allowed transformations from your instructions.", + "Do NOT rewrite or paraphrase -- preserve the original text.", + "", + "DOCUMENT TO PROOFREAD:", + "```markdown", + existing.rstrip("\n"), + "```", + "", + ] + else: + parts += [ + "Format the following rule data into a query help file.", + "Use the literal MISRA rule text below -- do NOT paraphrase.", + "Follow the section structure from your instructions exactly.", + "", + ] + + # Provide rule JSON as reference (for fact-checking or initial + # formatting when there is no existing_md). + payload = { + "standard": standard, + "standard_display": STD_DISPLAY[standard], + "rule": rule, + "query": {k: v for k, v in query.items() if k != "existing_md"}, + } + parts += [ + "REFERENCE DATA (for fact-checking and metadata):", + "```json", + json.dumps(payload, indent=2), + "```", + "", + ] + + if impl_scope: + desc = impl_scope.get("description", "") + items = impl_scope.get("items", []) + parts.append("IMPLEMENTATION_SCOPE (use verbatim in" + " '## Implementation notes'):") + parts.append(desc) + for item in items: + parts.append(f"* {item}") + parts.append("") + + parts += [ + f"The heading MUST be \"# {rule['raw_id']}: <title>\" where" + f" <title> comes from ql_name_title" + f" (\"{query.get('ql_name_title', '')}\")," + f" NOT from the PDF rule title.", + "", + "Now emit the proofread .md content.", + ] + + return "\n".join(parts) + + +def unwrap_fence(text: str) -> str: + """Strip ```markdown ... ``` if the model wrapped the whole file.""" + s = text.strip() + for tag in ("markdown", "md", ""): + prefix = f"```{tag}\n" if tag else "```\n" + if s.startswith(prefix) and s.endswith("\n```"): + return s[len(prefix):-4] + if s.startswith(prefix.rstrip("\n")) and s.endswith("```"): + inner = s[len(prefix.rstrip("\n")):-3].lstrip("\n").rstrip() + return inner + return text + + +# --------------------------------------------------------------------------- +# Main rewrite loop +# --------------------------------------------------------------------------- + +from cache import load_cache # noqa: E402 + + +def iter_work( + cache: dict[str, Any], + rule_filter: set[str] | None, +) -> Iterable[tuple[dict[str, Any], dict[str, Any]]]: + rules = cache["rules"] + queries = cache["queries"] + for rule_id in sorted(queries.keys()): + if rule_filter and rule_id not in rule_filter: + continue + rule = rules.get(rule_id) + if rule is None: + print(f" skip {rule_id}: no PDF rule entry", file=sys.stderr) + continue + for q in queries[rule_id]: + yield rule, q + + +def rewrite_one( + session: CopilotSession, + rule: dict[str, Any], + query: dict[str, Any], + standard: str, + model: str, +) -> str: + messages = [ + {"role": "system", "content": system_prompt()}, + {"role": "user", "content": user_prompt(rule, query, standard)}, + ] + body = session.chat(messages, model=model) + body = unwrap_fence(body).strip() + if not body.endswith("\n"): + body += "\n" + return body + + +def main() -> int: + p = argparse.ArgumentParser(description=__doc__.split("\n\n", 1)[0]) + p.add_argument("--standard", required=True, choices=SUPPORTED_STANDARDS) + p.add_argument("--help-repo", type=Path, default=DEFAULT_HELP_REPO, + help=f"Path to codeql-coding-standards-help (default: {DEFAULT_HELP_REPO}).") + p.add_argument("--rule", action="append", default=[], + help="Restrict to specific rule IDs (e.g. RULE-6-7-1). Repeatable.") + p.add_argument("--model", default=DEFAULT_MODEL, + help=f"Copilot model id. Default: {DEFAULT_MODEL}. " + f"Known good: {', '.join(MODEL_FALLBACKS)}.") + p.add_argument("--no-overwrite", action="store_true", + help="Skip queries that already have a .md file.") + p.add_argument("--dry-run", action="store_true", + help="Plan and call the model but do not write files.") + p.add_argument("--limit", type=int, default=None, + help="Process at most N (rule, query) pairs.") + args = p.parse_args() + + help_repo: Path = args.help_repo.resolve() + if not help_repo.is_dir(): + print(f"help repo not found: {help_repo}", file=sys.stderr) + return 2 + + cache = load_cache(help_repo, args.standard) + rule_filter = {r.upper() for r in args.rule} if args.rule else None + + work = list(iter_work(cache, rule_filter)) + if args.limit is not None: + work = work[: args.limit] + print(f"Planned: {len(work)} (rule, query) pairs for {args.standard}") + + oauth = discover_oauth_token() + session = CopilotSession(oauth) + # Force an early token fetch so auth failures surface before we + # start iterating. + _ = session.token() + print(f"Copilot session ready. Model: {args.model}") + + wrote = unchanged = skipped = failed = 0 + for i, (rule, query) in enumerate(work, 1): + rel = query["md_path"] + target = help_repo / rel + existing = query.get("existing_md") + + if existing is not None and args.no_overwrite: + print(f"[{i}/{len(work)}] skip-existing {rel}") + skipped += 1 + continue + + try: + body = rewrite_one(session, rule, query, args.standard, args.model) + except Exception as exc: # noqa: BLE001 - surface and keep going + print(f"[{i}/{len(work)}] FAILED {rel}: {exc}", file=sys.stderr) + failed += 1 + continue + + if existing == body: + print(f"[{i}/{len(work)}] unchanged {rel}") + unchanged += 1 + continue + + if args.dry_run: + print(f"[{i}/{len(work)}] would-write {rel} ({len(body)} bytes)") + wrote += 1 + continue + + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(body, encoding="utf-8") + verb = "wrote-new" if existing is None else "wrote-changed" + print(f"[{i}/{len(work)}] {verb} {rel} ({len(body)} bytes)") + wrote += 1 + + print( + f"\nDone. wrote={wrote} unchanged={unchanged} " + f"skipped={skipped} failed={failed}" + ) + return 0 if failed == 0 else 1 + + +if __name__ == "__main__": + sys.exit(main())