| 
 | 1 | +#!/usr/bin/env python3  | 
 | 2 | +"""Duplicate the last CodeQL query history entry, pointing it at a given evaluator log.  | 
 | 3 | +
  | 
 | 4 | +Behavior:  | 
 | 5 | +1. Locate the most relevant ``workspace-query-history.json`` (supports local & remote VS Code).  | 
 | 6 | +2. Duplicate the final object in ``queries``.  | 
 | 7 | +3. Generate a fresh random ID and a new timestamp.  | 
 | 8 | +4. Set ``jsonEvalLogSummaryLocation`` to the provided summary file path.  | 
 | 9 | +5. Set ``initialInfo.userSpecifiedLabel`` to ``Evaluator log at <dir>/<filename>`` (last 2 path parts).  | 
 | 10 | +6. Write back atomically.  | 
 | 11 | +
  | 
 | 12 | +Usage: python3 misc/scripts/patch_query_history.py /path/to/evaluator-log.summary.jsonl  | 
 | 13 | +"""  | 
 | 14 | +from __future__ import annotations  | 
 | 15 | +import argparse  | 
 | 16 | +import json, os, random, string, tempfile, sys  | 
 | 17 | +from pathlib import Path  | 
 | 18 | +from typing import List  | 
 | 19 | +from datetime import datetime, timezone  | 
 | 20 | +import copy  | 
 | 21 | + | 
 | 22 | + | 
 | 23 | +# Extension folder segment for CodeQL extension query history  | 
 | 24 | +EXT_SEGMENT = "GitHub.vscode-codeql"  | 
 | 25 | +HISTORY_FILENAME = "workspace-query-history.json"  | 
 | 26 | +WORKSPACE_JSON = "workspace.json"  | 
 | 27 | + | 
 | 28 | +def candidate_user_data_dirs() -> List[Path]:  | 
 | 29 | +    """Return plausible VS Code user data dirs (ordered, deduped)."""  | 
 | 30 | +    home = Path.home()  | 
 | 31 | +    env = os.environ  | 
 | 32 | +    override = env.get("VSCODE_USER_DATA_DIR")  | 
 | 33 | +    bases: List[Path] = []  | 
 | 34 | +    if override:  | 
 | 35 | +        bases.append(Path(override).expanduser())  | 
 | 36 | +    if os.name == "nt":  | 
 | 37 | +        appdata = env.get("APPDATA")  | 
 | 38 | +        if appdata:  | 
 | 39 | +            bases.append(Path(appdata) / "Code" / "User")  | 
 | 40 | +    elif sys.platform == "darwin":  # macOS inline check  | 
 | 41 | +        bases.append(home / "Library" / "Application Support" / "Code" / "User")  | 
 | 42 | +    else:  | 
 | 43 | +        bases.append(home / ".config" / "Code" / "User")  | 
 | 44 | +    # Remote / server variants  | 
 | 45 | +    bases.extend([  | 
 | 46 | +        home / ".vscode-remote" / "data" / "User",  | 
 | 47 | +        home / ".vscode-server" / "data" / "User",  | 
 | 48 | +        home / ".vscode" / "data" / "User",  | 
 | 49 | +    ])  | 
 | 50 | +    seen: set[Path] = set()  | 
 | 51 | +    ordered: List[Path] = []  | 
 | 52 | +    for b in bases:  | 
 | 53 | +        if b not in seen:  | 
 | 54 | +            seen.add(b)  | 
 | 55 | +            ordered.append(b)  | 
 | 56 | +    return ordered  | 
 | 57 | + | 
 | 58 | + | 
 | 59 | +def find_history_files() -> List[Path]:  | 
 | 60 | +    """Return all candidate history files sorted by descending modification time.  | 
 | 61 | +    """  | 
 | 62 | +    candidates: List[Path] = []  | 
 | 63 | +    for base in candidate_user_data_dirs():  | 
 | 64 | +        storage_root = base / "workspaceStorage"  | 
 | 65 | +        if not storage_root.is_dir():  | 
 | 66 | +            continue  | 
 | 67 | +        for ws_entry in storage_root.iterdir():  | 
 | 68 | +            if not ws_entry.is_dir():  | 
 | 69 | +                continue  | 
 | 70 | +            history_file = ws_entry / EXT_SEGMENT / HISTORY_FILENAME  | 
 | 71 | +            if history_file.is_file():  | 
 | 72 | +                candidates.append(history_file)  | 
 | 73 | +    candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True)  | 
 | 74 | +    return candidates  | 
 | 75 | + | 
 | 76 | +def _generate_new_id() -> str:  | 
 | 77 | +    """Return a new random id (24 chars from allowed set, prefixed with 'evaluator-log-' for stability)."""  | 
 | 78 | +    alphabet = string.ascii_letters + string.digits + "_-"  | 
 | 79 | +    return "evaluator-log-" + "".join(random.choice(alphabet) for _ in range(23))  | 
 | 80 | + | 
 | 81 | +def atomic_write_json(target: Path, obj) -> None:  | 
 | 82 | +    fd, tmp = tempfile.mkstemp(dir=str(target.parent), prefix="history.", suffix=".json")  | 
 | 83 | +    try:  | 
 | 84 | +        with os.fdopen(fd, "w", encoding="utf-8") as out:  | 
 | 85 | +            json.dump(obj, out, ensure_ascii=False, indent=2)  | 
 | 86 | +            out.write("\n")  | 
 | 87 | +        os.replace(tmp, target)  | 
 | 88 | +    finally:  | 
 | 89 | +        if os.path.exists(tmp):  | 
 | 90 | +            try:  | 
 | 91 | +                os.remove(tmp)  | 
 | 92 | +            except OSError:  | 
 | 93 | +                pass  | 
 | 94 | + | 
 | 95 | +def _duplicate_last_entry(path: Path, summary_path: Path) -> dict:  | 
 | 96 | +    try:  | 
 | 97 | +        data = json.loads(path.read_text(encoding="utf-8"))  | 
 | 98 | +    except json.JSONDecodeError as e:  | 
 | 99 | +        raise SystemExit(f"History file JSON is corrupt: {e}")  | 
 | 100 | +    if not isinstance(data, dict) or not isinstance(data.get("queries"), list):  | 
 | 101 | +        raise SystemExit("Unexpected history file structure: missing 'queries' list")  | 
 | 102 | +    queries = data["queries"]  | 
 | 103 | +    if not queries:  | 
 | 104 | +        raise SystemExit("History file contains no queries to duplicate. Please run a query in VSCode and try again.")  | 
 | 105 | +    last = queries[-1]  | 
 | 106 | +    if not isinstance(last, dict):  | 
 | 107 | +        raise SystemExit("Last query entry malformed")  | 
 | 108 | +    payload = copy.deepcopy(last)  | 
 | 109 | +    initial = payload.setdefault("initialInfo", {})  | 
 | 110 | +    if not isinstance(initial, dict):  | 
 | 111 | +        initial = {}  | 
 | 112 | +        payload["initialInfo"] = initial  | 
 | 113 | +    new_id = _generate_new_id()  | 
 | 114 | +    initial["id"] = new_id  | 
 | 115 | +    initial["start"] = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")  | 
 | 116 | +    payload["jsonEvalLogSummaryLocation"] = str(summary_path)  | 
 | 117 | +    parts = list(summary_path.parts)  | 
 | 118 | +    last_two = "/".join(parts[-2:]) if len(parts) >= 2 else parts[-1]  | 
 | 119 | +    new_label = f"Evaluator log at {last_two}"  | 
 | 120 | +    initial["userSpecifiedLabel"] = new_label  | 
 | 121 | +    queries.append(payload)  | 
 | 122 | +    atomic_write_json(path, data)  | 
 | 123 | +    return {"new_id": new_id, "new_label": new_label, "count": len(queries)}  | 
 | 124 | + | 
 | 125 | +def main() -> int:  | 
 | 126 | +    parser = argparse.ArgumentParser(description="Duplicate last CodeQL query history entry, patching summary location and label.")  | 
 | 127 | +    parser.add_argument("summary_path", type=Path, help="Path to evaluator-log.summary.jsonl file (required).")  | 
 | 128 | +    args = parser.parse_args()  | 
 | 129 | + | 
 | 130 | +    summary_path: Path = args.summary_path  | 
 | 131 | +    if not summary_path.is_file():  | 
 | 132 | +        raise SystemExit(f"Summary file does not exist: {summary_path}")  | 
 | 133 | + | 
 | 134 | +    candidates = find_history_files()  | 
 | 135 | +    if not candidates:  | 
 | 136 | +        raise SystemExit("No workspace-query-history.json files found.")  | 
 | 137 | +    best = candidates[0]  | 
 | 138 | + | 
 | 139 | +    result = _duplicate_last_entry(best, summary_path)  | 
 | 140 | + | 
 | 141 | +    print(f"Patched history: {best}")  | 
 | 142 | +    print(f"Evaluator log summary: {summary_path}")  | 
 | 143 | +    print(f"New ID: {result['new_id']}")  | 
 | 144 | +    print(f"Label: {result['new_label']}")  | 
 | 145 | +    print(f"Total entries: {result['count']}")  | 
 | 146 | + | 
 | 147 | +if __name__ == "__main__":  | 
 | 148 | +    raise SystemExit(main())  | 
0 commit comments