From bdb3058178c165d8549b0d7314e9addc920876c5 Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:32:02 +0800 Subject: [PATCH 01/50] Add unified WebDriver BiDi event bridge for Selenium and Playwright --- je_web_runner/utils/bidi_backend/__init__.py | 9 + je_web_runner/utils/bidi_backend/bridge.py | 191 +++++++++++++++++++ test/unit_test/test_bidi_backend.py | 121 ++++++++++++ 3 files changed, 321 insertions(+) create mode 100644 je_web_runner/utils/bidi_backend/__init__.py create mode 100644 je_web_runner/utils/bidi_backend/bridge.py create mode 100644 test/unit_test/test_bidi_backend.py diff --git a/je_web_runner/utils/bidi_backend/__init__.py b/je_web_runner/utils/bidi_backend/__init__.py new file mode 100644 index 0000000..41121b3 --- /dev/null +++ b/je_web_runner/utils/bidi_backend/__init__.py @@ -0,0 +1,9 @@ +"""Unified WebDriver BiDi event/command bridge across Selenium + Playwright.""" +from je_web_runner.utils.bidi_backend.bridge import ( + BidiBackendError, + BidiBridge, + BidiEvent, + BidiSubscription, +) + +__all__ = ["BidiBackendError", "BidiBridge", "BidiEvent", "BidiSubscription"] diff --git a/je_web_runner/utils/bidi_backend/bridge.py b/je_web_runner/utils/bidi_backend/bridge.py new file mode 100644 index 0000000..18c76f3 --- /dev/null +++ b/je_web_runner/utils/bidi_backend/bridge.py @@ -0,0 +1,191 @@ +""" +WebDriver BiDi 統一橋:Selenium / Playwright 兩個後端共用的 event 訂閱介面。 +Unified BiDi-style event bridge over Selenium 4's BiDi or Playwright's +context/page event API. Callers ``subscribe`` to a logical event name and +get a :class:`BidiSubscription` they can ``unsubscribe()`` later. + +The abstraction hides: + +- Selenium 4's ``driver.script.add_console_message_handler`` / ``driver.bidi_connection``. +- Playwright's ``page.on("console", fn)`` / ``page.on("response", fn)`` / context-level events. + +Logical event names supported by default: ``console``, ``response``, +``request``, ``page_load``. Additional names can be registered via +:meth:`BidiBridge.register_translator`. +""" +from __future__ import annotations + +import itertools +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +from je_web_runner.utils.exception.exceptions import WebRunnerException +from je_web_runner.utils.logging.loggin_instance import web_runner_logger + + +class BidiBackendError(WebRunnerException): + """Raised when subscription / unsubscription fails or backend is unsupported.""" + + +@dataclass +class BidiEvent: + """Backend-agnostic event payload.""" + + name: str + payload: Dict[str, Any] + + +@dataclass +class BidiSubscription: + """Handle returned by :meth:`BidiBridge.subscribe`.""" + + subscription_id: int + event: str + backend: str + detach: Callable[[], None] + + def unsubscribe(self) -> None: + try: + self.detach() + except Exception as error: # pylint: disable=broad-except + web_runner_logger.warning( + f"bidi unsubscribe {self.event!r} failed: {error!r}" + ) + + +# Translator signature: (target, callback) -> detach_fn +Translator = Callable[[Any, Callable[[BidiEvent], None]], Callable[[], None]] + + +def _selenium_console_translator(target: Any, + callback: Callable[[BidiEvent], None]) -> Callable[[], None]: + if not hasattr(target, "script") or not hasattr(target.script, "add_console_message_handler"): + raise BidiBackendError("driver.script.add_console_message_handler missing") + + def adapter(message: Any) -> None: + callback(BidiEvent(name="console", payload={ + "type": getattr(message, "type", None), + "text": getattr(message, "text", None), + })) + + handle = target.script.add_console_message_handler(adapter) + + def detach() -> None: + if hasattr(target.script, "remove_console_message_handler"): + target.script.remove_console_message_handler(handle) + + return detach + + +def _playwright_event_translator(event_name: str) -> Translator: + + def translator(target: Any, callback: Callable[[BidiEvent], None]) -> Callable[[], None]: + if not hasattr(target, "on") or not hasattr(target, "remove_listener"): + raise BidiBackendError("page does not expose on/remove_listener") + + def adapter(payload: Any) -> None: + callback(BidiEvent( + name=event_name, + payload=_extract_playwright_payload(event_name, payload), + )) + + target.on(event_name, adapter) + + def detach() -> None: + try: + target.remove_listener(event_name, adapter) + except Exception as error: # pylint: disable=broad-except + web_runner_logger.debug( + f"playwright remove_listener {event_name!r} failed: {error!r}" + ) + + return detach + + return translator + + +def _extract_playwright_payload(event_name: str, payload: Any) -> Dict[str, Any]: + if event_name == "console": + return { + "type": getattr(payload, "type", None), + "text": getattr(payload, "text", None), + } + if event_name == "response": + return { + "url": getattr(payload, "url", None), + "status": getattr(payload, "status", None), + } + if event_name == "request": + return { + "url": getattr(payload, "url", None), + "method": getattr(payload, "method", None), + } + if event_name == "page_load": + return {"url": getattr(payload, "url", None)} + return {"raw": str(payload)[:200]} + + +class BidiBridge: + """Backend-detecting bridge for BiDi-style event subscription.""" + + def __init__(self) -> None: + self._subscriptions: Dict[int, BidiSubscription] = {} + self._counter = itertools.count(1) + self._translators: Dict[str, Dict[str, Translator]] = { + "selenium": {"console": _selenium_console_translator}, + "playwright": { + "console": _playwright_event_translator("console"), + "response": _playwright_event_translator("response"), + "request": _playwright_event_translator("request"), + "page_load": _playwright_event_translator("load"), + }, + } + + def detect_backend(self, target: Any) -> str: + if hasattr(target, "script") and hasattr(target, "current_url"): + return "selenium" + if hasattr(target, "on") and hasattr(target, "remove_listener"): + return "playwright" + raise BidiBackendError( + f"cannot detect backend for {type(target).__name__}" + ) + + def register_translator(self, backend: str, event: str, translator: Translator) -> None: + self._translators.setdefault(backend, {})[event] = translator + + def subscribe( + self, + target: Any, + event: str, + callback: Callable[[BidiEvent], None], + backend: Optional[str] = None, + ) -> BidiSubscription: + used_backend = backend or self.detect_backend(target) + translator = self._translators.get(used_backend, {}).get(event) + if translator is None: + raise BidiBackendError( + f"no translator for {used_backend}/{event!r}" + ) + detach = translator(target, callback) + sub = BidiSubscription( + subscription_id=next(self._counter), + event=event, + backend=used_backend, + detach=detach, + ) + self._subscriptions[sub.subscription_id] = sub + web_runner_logger.info( + f"bidi subscribe id={sub.subscription_id} backend={used_backend} event={event!r}" + ) + return sub + + def unsubscribe(self, subscription: BidiSubscription) -> None: + subscription.unsubscribe() + self._subscriptions.pop(subscription.subscription_id, None) + + def unsubscribe_all(self) -> None: + for sub in list(self._subscriptions.values()): + self.unsubscribe(sub) + + def active_subscriptions(self) -> List[BidiSubscription]: + return list(self._subscriptions.values()) diff --git a/test/unit_test/test_bidi_backend.py b/test/unit_test/test_bidi_backend.py new file mode 100644 index 0000000..f0bfa76 --- /dev/null +++ b/test/unit_test/test_bidi_backend.py @@ -0,0 +1,121 @@ +import unittest +from unittest.mock import MagicMock + +from je_web_runner.utils.bidi_backend import ( + BidiBackendError, + BidiBridge, + BidiEvent, +) + + +class TestDetect(unittest.TestCase): + + def test_detects_selenium(self): + target = MagicMock() + target.current_url = "https://x.com" + # MagicMock auto-creates attributes; ensure both exist + _ = target.script + bridge = BidiBridge() + self.assertEqual(bridge.detect_backend(target), "selenium") + + def test_detects_playwright(self): + target = MagicMock(spec=["on", "remove_listener"]) + bridge = BidiBridge() + self.assertEqual(bridge.detect_backend(target), "playwright") + + def test_unknown_target_raises(self): + with self.assertRaises(BidiBackendError): + BidiBridge().detect_backend(object()) + + +class TestPlaywrightSubscribe(unittest.TestCase): + + def test_console_subscription_routes_event(self): + page = MagicMock() + bridge = BidiBridge() + captured = [] + sub = bridge.subscribe( + page, "console", captured.append, backend="playwright", + ) + adapter = page.on.call_args.args[1] + message = MagicMock() + message.type = "log" + message.text = "hello" + adapter(message) + self.assertEqual(len(captured), 1) + evt = captured[0] + self.assertIsInstance(evt, BidiEvent) + self.assertEqual(evt.name, "console") + self.assertEqual(evt.payload["text"], "hello") + bridge.unsubscribe(sub) + page.remove_listener.assert_called_once() + + def test_response_subscription_extracts_url_and_status(self): + page = MagicMock() + bridge = BidiBridge() + captured = [] + bridge.subscribe(page, "response", captured.append, backend="playwright") + adapter = page.on.call_args.args[1] + response = MagicMock(url="/api/x", status=200) + adapter(response) + self.assertEqual(captured[0].payload["status"], 200) + + +class TestSeleniumSubscribe(unittest.TestCase): + + def test_console_translator_fails_when_method_missing(self): + target = MagicMock(spec=["script", "current_url"]) + target.script = MagicMock(spec=[]) + bridge = BidiBridge() + with self.assertRaises(BidiBackendError): + bridge.subscribe(target, "console", lambda _e: None, backend="selenium") + + def test_console_translator_routes_handle(self): + target = MagicMock() + # Provide both methods + target.script.add_console_message_handler.return_value = "handle-1" + target.current_url = "https://x.com" + bridge = BidiBridge() + captured = [] + sub = bridge.subscribe(target, "console", captured.append, backend="selenium") + adapter = target.script.add_console_message_handler.call_args.args[0] + msg = MagicMock(type="error", text="boom") + adapter(msg) + self.assertEqual(captured[0].payload["text"], "boom") + bridge.unsubscribe(sub) + target.script.remove_console_message_handler.assert_called_once_with("handle-1") + + +class TestUnknownEvent(unittest.TestCase): + + def test_unsupported_event_raises(self): + page = MagicMock() + bridge = BidiBridge() + with self.assertRaises(BidiBackendError): + bridge.subscribe(page, "weird-event", lambda _e: None, backend="playwright") + + def test_register_translator_extends(self): + page = MagicMock() + bridge = BidiBridge() + + def custom(target, callback): + return lambda: None + + bridge.register_translator("playwright", "weird-event", custom) + sub = bridge.subscribe(page, "weird-event", lambda _e: None, backend="playwright") + self.assertIn(sub, bridge.active_subscriptions()) + + +class TestUnsubscribeAll(unittest.TestCase): + + def test_clears_subscriptions(self): + page = MagicMock() + bridge = BidiBridge() + bridge.subscribe(page, "console", lambda _e: None, backend="playwright") + bridge.subscribe(page, "response", lambda _e: None, backend="playwright") + bridge.unsubscribe_all() + self.assertEqual(bridge.active_subscriptions(), []) + + +if __name__ == "__main__": + unittest.main() From 96a3bcb52470e1b5f003f75a3b5946df56bb684e Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:32:57 +0800 Subject: [PATCH 02/50] Add browser pool with warm sessions and recycle policy --- je_web_runner/utils/browser_pool/__init__.py | 8 + je_web_runner/utils/browser_pool/pool.py | 172 +++++++++++++++++++ test/unit_test/test_browser_pool.py | 111 ++++++++++++ 3 files changed, 291 insertions(+) create mode 100644 je_web_runner/utils/browser_pool/__init__.py create mode 100644 je_web_runner/utils/browser_pool/pool.py create mode 100644 test/unit_test/test_browser_pool.py diff --git a/je_web_runner/utils/browser_pool/__init__.py b/je_web_runner/utils/browser_pool/__init__.py new file mode 100644 index 0000000..3483a50 --- /dev/null +++ b/je_web_runner/utils/browser_pool/__init__.py @@ -0,0 +1,8 @@ +"""Pre-warmed browser pool with checkout / checkin semantics.""" +from je_web_runner.utils.browser_pool.pool import ( + BrowserPool, + BrowserPoolError, + PooledSession, +) + +__all__ = ["BrowserPool", "BrowserPoolError", "PooledSession"] diff --git a/je_web_runner/utils/browser_pool/pool.py b/je_web_runner/utils/browser_pool/pool.py new file mode 100644 index 0000000..07f6976 --- /dev/null +++ b/je_web_runner/utils/browser_pool/pool.py @@ -0,0 +1,172 @@ +""" +預先暖機的 browser instance pool;checkout / checkin 重複使用以省冷啟動。 +Browser pool with warm sessions. Pre-launches up to ``size`` browser +instances (Selenium driver or Playwright page) and hands them out via +``checkout`` / context-manager. Checked-in sessions are health-checked +and recycled if the predicate fails or ``max_uses`` is exceeded. + +The factory and health-check are caller-provided so the pool stays +backend-agnostic. +""" +from __future__ import annotations + +import contextlib +import threading +import time +from dataclasses import dataclass, field +from queue import Empty, Queue +from typing import Any, Callable, Iterator, List, Optional + +from je_web_runner.utils.exception.exceptions import WebRunnerException +from je_web_runner.utils.logging.loggin_instance import web_runner_logger + + +class BrowserPoolError(WebRunnerException): + """Raised when checkout times out or factory fails.""" + + +@dataclass +class PooledSession: + """Single browser session managed by the pool.""" + + session_id: int + instance: Any + uses: int = 0 + created_at: float = field(default_factory=time.monotonic) + + +SessionFactory = Callable[[], Any] +SessionDestructor = Callable[[Any], None] +HealthCheck = Callable[[Any], bool] + + +class BrowserPool: + """Thread-safe browser instance pool.""" + + def __init__( + self, + factory: SessionFactory, + destructor: Optional[SessionDestructor] = None, + health_check: Optional[HealthCheck] = None, + size: int = 2, + max_uses: int = 50, + ) -> None: + if size <= 0: + raise BrowserPoolError("size must be > 0") + if max_uses <= 0: + raise BrowserPoolError("max_uses must be > 0") + self._factory = factory + self._destructor = destructor or (lambda _instance: None) + self._health_check = health_check or (lambda _instance: True) + self._size = size + self._max_uses = max_uses + self._available: "Queue[PooledSession]" = Queue() + self._lock = threading.Lock() + self._next_id = 1 + self._closed = False + self._tracked: List[PooledSession] = [] + + def warm(self) -> None: + """Pre-launch ``size`` instances eagerly.""" + for _ in range(self._size): + session = self._spawn() + self._available.put(session) + + def _spawn(self) -> PooledSession: + try: + instance = self._factory() + except Exception as error: + raise BrowserPoolError(f"factory failed: {error!r}") from error + with self._lock: + session_id = self._next_id + self._next_id += 1 + session = PooledSession(session_id=session_id, instance=instance) + self._tracked.append(session) + web_runner_logger.info(f"browser_pool spawn id={session_id}") + return session + + def checkout(self, timeout: float = 30.0) -> PooledSession: + if self._closed: + raise BrowserPoolError("pool is closed") + deadline = time.monotonic() + timeout + while True: + try: + session = self._available.get_nowait() + except Empty: + if self._can_grow(): + session = self._spawn() + else: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise BrowserPoolError( + f"no session available within {timeout}s" + ) + try: + session = self._available.get(timeout=remaining) + except Empty: + raise BrowserPoolError( + f"no session available within {timeout}s" + ) from None + if not self._is_healthy(session): + self._destroy(session) + continue + return session + + def checkin(self, session: PooledSession) -> None: + if self._closed: + self._destroy(session) + return + session.uses += 1 + if session.uses >= self._max_uses or not self._is_healthy(session): + self._destroy(session) + return + self._available.put(session) + + @contextlib.contextmanager + def session(self, timeout: float = 30.0) -> Iterator[PooledSession]: + ses = self.checkout(timeout=timeout) + try: + yield ses + finally: + self.checkin(ses) + + def close(self) -> None: + with self._lock: + self._closed = True + tracked = list(self._tracked) + self._tracked.clear() + while not self._available.empty(): + try: + self._available.get_nowait() + except Empty: + break + for session in tracked: + self._destroy(session) + + def _is_healthy(self, session: PooledSession) -> bool: + try: + return bool(self._health_check(session.instance)) + except Exception as error: # pylint: disable=broad-except + web_runner_logger.debug( + f"browser_pool health-check raised id={session.session_id}: {error!r}" + ) + return False + + def _destroy(self, session: PooledSession) -> None: + try: + self._destructor(session.instance) + except Exception as error: # pylint: disable=broad-except + web_runner_logger.warning( + f"browser_pool destructor raised id={session.session_id}: {error!r}" + ) + with self._lock: + self._tracked = [s for s in self._tracked if s.session_id != session.session_id] + + def _can_grow(self) -> bool: + with self._lock: + return len(self._tracked) < self._size + + @property + def tracked_count(self) -> int: + with self._lock: + return len(self._tracked) diff --git a/test/unit_test/test_browser_pool.py b/test/unit_test/test_browser_pool.py new file mode 100644 index 0000000..bbc0506 --- /dev/null +++ b/test/unit_test/test_browser_pool.py @@ -0,0 +1,111 @@ +import unittest +from unittest.mock import MagicMock + +from je_web_runner.utils.browser_pool import ( + BrowserPool, + BrowserPoolError, +) + + +class TestBrowserPool(unittest.TestCase): + + def test_invalid_size_raises(self): + with self.assertRaises(BrowserPoolError): + BrowserPool(factory=lambda: object(), size=0) + + def test_invalid_max_uses_raises(self): + with self.assertRaises(BrowserPoolError): + BrowserPool(factory=lambda: object(), max_uses=0) + + def test_warm_creates_size_sessions(self): + factory = MagicMock(side_effect=lambda: object()) + pool = BrowserPool(factory=factory, size=3) + pool.warm() + self.assertEqual(pool.tracked_count, 3) + self.assertEqual(factory.call_count, 3) + + def test_checkout_reuses_warm_sessions(self): + factory = MagicMock(side_effect=lambda: object()) + pool = BrowserPool(factory=factory, size=2) + pool.warm() + s1 = pool.checkout(timeout=0.1) + s2 = pool.checkout(timeout=0.1) + # Already warmed; no extra factory calls + self.assertEqual(factory.call_count, 2) + self.assertNotEqual(s1.session_id, s2.session_id) + + def test_checkin_returns_to_pool(self): + pool = BrowserPool(factory=lambda: object(), size=1) + pool.warm() + sess = pool.checkout(timeout=0.1) + pool.checkin(sess) + sess2 = pool.checkout(timeout=0.1) + self.assertEqual(sess2.session_id, sess.session_id) + + def test_max_uses_recycles(self): + destructor = MagicMock() + pool = BrowserPool( + factory=lambda: object(), + destructor=destructor, + size=1, + max_uses=1, + ) + pool.warm() + sess = pool.checkout(timeout=0.1) + pool.checkin(sess) + # uses now == 1; pool destroyed it; next checkout spawns fresh + sess2 = pool.checkout(timeout=0.1) + self.assertNotEqual(sess.session_id, sess2.session_id) + destructor.assert_called_once_with(sess.instance) + + def test_unhealthy_session_recycled(self): + destructor = MagicMock() + check_count = {"n": 0} + + def health(_instance): + check_count["n"] += 1 + return check_count["n"] != 2 # second check fails + + pool = BrowserPool( + factory=lambda: object(), + destructor=destructor, + health_check=health, + size=2, + ) + pool.warm() + sess1 = pool.checkout(timeout=0.1) + sess2 = pool.checkout(timeout=0.1) + # The second session fails health check on checkout and is recycled, + # then a fresh one is spawned in its place. + self.assertNotEqual(sess1.session_id, sess2.session_id) + destructor.assert_called() # destroyed at least once + + def test_factory_failure_raises(self): + def failing(): + raise RuntimeError("no driver") + + pool = BrowserPool(factory=failing, size=1) + with self.assertRaises(BrowserPoolError): + pool.checkout(timeout=0.1) + + def test_context_manager_releases(self): + pool = BrowserPool(factory=lambda: object(), size=1) + pool.warm() + with pool.session(timeout=0.1) as sess: + sid = sess.session_id + # checking in puts it back; second checkout returns the same + with pool.session(timeout=0.1) as sess2: + self.assertEqual(sess2.session_id, sid) + + def test_close_destroys_all(self): + destructor = MagicMock() + pool = BrowserPool(factory=lambda: object(), destructor=destructor, size=2) + pool.warm() + pool.close() + self.assertEqual(destructor.call_count, 2) + with self.assertRaises(BrowserPoolError): + pool.checkout(timeout=0.1) + + +if __name__ == "__main__": + unittest.main() From 99d062cf1dda709a213ea61af0ebdafb40ac4d0d Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:34:06 +0800 Subject: [PATCH 03/50] Add HAR replay server for offline-deterministic e2e tests --- je_web_runner/utils/har_replay/__init__.py | 8 + je_web_runner/utils/har_replay/server.py | 240 +++++++++++++++++++++ test/unit_test/test_har_replay.py | 143 ++++++++++++ 3 files changed, 391 insertions(+) create mode 100644 je_web_runner/utils/har_replay/__init__.py create mode 100644 je_web_runner/utils/har_replay/server.py create mode 100644 test/unit_test/test_har_replay.py diff --git a/je_web_runner/utils/har_replay/__init__.py b/je_web_runner/utils/har_replay/__init__.py new file mode 100644 index 0000000..b941c28 --- /dev/null +++ b/je_web_runner/utils/har_replay/__init__.py @@ -0,0 +1,8 @@ +"""Local HAR replay server: serve recorded responses from a HAR file.""" +from je_web_runner.utils.har_replay.server import ( + HarReplayError, + HarReplayServer, + load_har, +) + +__all__ = ["HarReplayError", "HarReplayServer", "load_har"] diff --git a/je_web_runner/utils/har_replay/server.py b/je_web_runner/utils/har_replay/server.py new file mode 100644 index 0000000..88fd9fd --- /dev/null +++ b/je_web_runner/utils/har_replay/server.py @@ -0,0 +1,240 @@ +""" +HAR replay server:把 har_diff 收到的 HAR 反過來當 mock backend。 +HAR replay server. Loads a HAR file and serves matching responses for +incoming requests so e2e tests can run completely offline. + +Matching is keyed on ``(method, url-path-with-query)``; if the same key +appears multiple times in the HAR, replay rotates through them in order +and stays on the last entry once exhausted. +""" +from __future__ import annotations + +import json +import re +import threading +from collections import defaultdict +from dataclasses import dataclass, field +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from urllib.parse import urlparse + +from je_web_runner.utils.exception.exceptions import WebRunnerException +from je_web_runner.utils.logging.loggin_instance import web_runner_logger + + +class HarReplayError(WebRunnerException): + """Raised when the HAR file is invalid or the server can't bind.""" + + +@dataclass +class HarEntry: + method: str + path: str + status: int + headers: Dict[str, str] = field(default_factory=dict) + body: str = "" + body_is_base64: bool = False + + +def load_har(source: Union[str, Path]) -> List[HarEntry]: + """Read a HAR file and return its ``entries`` projected to :class:`HarEntry`.""" + path = Path(source) + if not path.is_file(): + raise HarReplayError(f"HAR file not found: {source!r}") + try: + document = json.loads(path.read_text(encoding="utf-8")) + except ValueError as error: + raise HarReplayError(f"HAR is not JSON: {error}") from error + entries = (document.get("log") or {}).get("entries") + if not isinstance(entries, list): + raise HarReplayError("HAR missing log.entries") + parsed: List[HarEntry] = [] + for index, entry in enumerate(entries): + try: + parsed.append(_entry_from_har(entry)) + except (KeyError, TypeError, ValueError) as error: + web_runner_logger.warning(f"har_replay skipping entry {index}: {error}") + return parsed + + +def _entry_from_har(entry: Dict[str, Any]) -> HarEntry: + request = entry["request"] + response = entry["response"] + parsed = urlparse(request["url"]) + request_path = parsed.path or "/" + if parsed.query: + request_path = f"{request_path}?{parsed.query}" + content = response.get("content") or {} + headers = { + h.get("name", ""): h.get("value", "") + for h in response.get("headers") or [] + if isinstance(h, dict) + } + if content.get("mimeType"): + headers.setdefault("content-type", content["mimeType"]) + return HarEntry( + method=str(request.get("method", "GET")).upper(), + path=request_path, + status=int(response.get("status", 200)), + headers=headers, + body=str(content.get("text") or ""), + body_is_base64=str(content.get("encoding", "")).lower() == "base64", + ) + + +_PathMatcher = Callable[[str], bool] + + +def _build_matcher(pattern: str) -> _PathMatcher: + if pattern.startswith("re:"): + regex = re.compile(pattern[3:]) + return lambda path: regex.search(path) is not None + if "*" in pattern: + regex = re.compile("^" + re.escape(pattern).replace(r"\*", ".*") + "$") + return lambda path: regex.match(path) is not None + return lambda path: path == pattern + + +@dataclass +class _Bucket: + matcher: _PathMatcher + pattern: str + entries: List[HarEntry] + cursor: int = 0 + + +class HarReplayServer: + """In-process HTTP server that replays HAR responses.""" + + def __init__( + self, + entries: List[HarEntry], + host: str = "127.0.0.1", + port: int = 0, + not_found_status: int = 404, + ) -> None: + if not entries: + raise HarReplayError("entries must be non-empty") + self.entries = entries + self.host = host + self.port = port + self.not_found_status = not_found_status + self._buckets: Dict[str, List[_Bucket]] = defaultdict(list) + self._build_buckets() + self._server: Optional[HTTPServer] = None + self._thread: Optional[threading.Thread] = None + self.calls: List[Tuple[str, str]] = [] + + def _build_buckets(self) -> None: + grouped: Dict[Tuple[str, str], List[HarEntry]] = defaultdict(list) + for entry in self.entries: + grouped[(entry.method, entry.path)].append(entry) + for (method, path), group in grouped.items(): + bucket = _Bucket( + matcher=_build_matcher(path), + pattern=path, + entries=group, + ) + self._buckets[method].append(bucket) + + def find(self, method: str, path: str) -> Optional[HarEntry]: + method_upper = method.upper() + self.calls.append((method_upper, path)) + candidates = self._buckets.get(method_upper) or [] + for bucket in candidates: + if bucket.matcher(path): + entry = bucket.entries[bucket.cursor] + if bucket.cursor + 1 < len(bucket.entries): + bucket.cursor += 1 + return entry + return None + + def start(self) -> str: + if self._server is not None: + raise HarReplayError("HAR replay server already started") + handler = _make_handler(self) + srv = HTTPServer((self.host, self.port), handler) + thread = threading.Thread(target=srv.serve_forever, daemon=True) + thread.start() + self._server = srv + self._thread = thread + self.port = srv.server_address[1] + web_runner_logger.info(f"har_replay listening on {self.host}:{self.port}") + return f"http://{self.host}:{self.port}" # NOSONAR — local mock + + def stop(self) -> None: + if self._server is not None: + self._server.shutdown() + self._server.server_close() + self._server = None + self._thread = None + + +def _make_handler(server: HarReplayServer) -> Callable: + + class _ReplayHandler(BaseHTTPRequestHandler): + + def log_message(self, format, *args): # pylint: disable=redefined-builtin + return + + def _serve(self) -> None: + method = self.command + request_path = self.path + entry = server.find(method, request_path) + if entry is None: + payload = json.dumps({ + "error": "no har match", + "method": method, + "path": request_path, + }).encode("utf-8") + self.send_response(server.not_found_status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + return + body_bytes = _entry_body_bytes(entry) + self.send_response(entry.status) + for name, value in entry.headers.items(): + if name.lower() not in {"content-length", "transfer-encoding"}: + self.send_header(name, value) + self.send_header("Content-Length", str(len(body_bytes))) + self.end_headers() + self.wfile.write(body_bytes) + + def do_GET(self): # noqa: N802 + self._serve() + + def do_POST(self): # noqa: N802 + length = int(self.headers.get("Content-Length") or 0) + if length: + self.rfile.read(length) # drain body, ignore for matching + self._serve() + + def do_PUT(self): # noqa: N802 + length = int(self.headers.get("Content-Length") or 0) + if length: + self.rfile.read(length) + self._serve() + + def do_DELETE(self): # noqa: N802 + self._serve() + + def do_PATCH(self): # noqa: N802 + length = int(self.headers.get("Content-Length") or 0) + if length: + self.rfile.read(length) + self._serve() + + return _ReplayHandler + + +def _entry_body_bytes(entry: HarEntry) -> bytes: + if entry.body_is_base64: + import base64 + try: + return base64.b64decode(entry.body or "") + except (ValueError, TypeError): + return (entry.body or "").encode("utf-8") + return (entry.body or "").encode("utf-8") diff --git a/test/unit_test/test_har_replay.py b/test/unit_test/test_har_replay.py new file mode 100644 index 0000000..fd2b616 --- /dev/null +++ b/test/unit_test/test_har_replay.py @@ -0,0 +1,143 @@ +import json +import tempfile +import unittest +import urllib.request +from pathlib import Path + +from je_web_runner.utils.har_replay import ( + HarReplayError, + HarReplayServer, + load_har, +) +from je_web_runner.utils.har_replay.server import HarEntry + + +def _write_har(path, entries): + document = {"log": {"entries": entries}} + Path(path).write_text(json.dumps(document), encoding="utf-8") + + +class TestLoadHar(unittest.TestCase): + + def test_loads_entries(self): + with tempfile.TemporaryDirectory() as tmpdir: + har = Path(tmpdir) / "x.har" + _write_har(har, [{ + "request": {"method": "GET", "url": "https://api/foo"}, + "response": { + "status": 200, + "headers": [{"name": "Content-Type", "value": "application/json"}], + "content": {"text": '{"ok": true}', "mimeType": "application/json"}, + }, + }]) + entries = load_har(har) + self.assertEqual(len(entries), 1) + self.assertEqual(entries[0].path, "/foo") + self.assertEqual(entries[0].body, '{"ok": true}') + + def test_missing_file_raises(self): + with self.assertRaises(HarReplayError): + load_har("nope.har") + + def test_invalid_json_raises(self): + with tempfile.TemporaryDirectory() as tmpdir: + har = Path(tmpdir) / "x.har" + har.write_text("not json", encoding="utf-8") + with self.assertRaises(HarReplayError): + load_har(har) + + def test_missing_log_entries_raises(self): + with tempfile.TemporaryDirectory() as tmpdir: + har = Path(tmpdir) / "x.har" + har.write_text(json.dumps({"log": {}}), encoding="utf-8") + with self.assertRaises(HarReplayError): + load_har(har) + + def test_url_query_string_kept(self): + with tempfile.TemporaryDirectory() as tmpdir: + har = Path(tmpdir) / "x.har" + _write_har(har, [{ + "request": {"method": "GET", "url": "https://api/foo?id=42"}, + "response": {"status": 200, "content": {"text": ""}}, + }]) + entries = load_har(har) + self.assertEqual(entries[0].path, "/foo?id=42") + + +class TestHarReplayServerMatching(unittest.TestCase): + + def test_exact_match(self): + server = HarReplayServer(entries=[HarEntry( + method="GET", path="/api", status=200, body="ok", + )]) + match = server.find("GET", "/api") + self.assertIsNotNone(match) + + def test_method_filter(self): + server = HarReplayServer(entries=[HarEntry( + method="POST", path="/x", status=200, + )]) + self.assertIsNone(server.find("GET", "/x")) + + def test_glob_match(self): + server = HarReplayServer(entries=[HarEntry( + method="GET", path="/api/users/*", status=200, + )]) + self.assertIsNotNone(server.find("GET", "/api/users/42")) + + def test_regex_match(self): + server = HarReplayServer(entries=[HarEntry( + method="POST", path="re:/api/v\\d+/items", status=201, + )]) + self.assertIsNotNone(server.find("POST", "/api/v3/items")) + + def test_rotation_then_sticky(self): + server = HarReplayServer(entries=[ + HarEntry(method="GET", path="/x", status=200, body="first"), + HarEntry(method="GET", path="/x", status=200, body="second"), + ]) + self.assertEqual(server.find("GET", "/x").body, "first") + self.assertEqual(server.find("GET", "/x").body, "second") + self.assertEqual(server.find("GET", "/x").body, "second") + + def test_calls_recorded(self): + server = HarReplayServer(entries=[HarEntry(method="GET", path="/x", status=200)]) + server.find("GET", "/x") + server.find("POST", "/y") + self.assertEqual(server.calls, [("GET", "/x"), ("POST", "/y")]) + + def test_empty_entries_raises(self): + with self.assertRaises(HarReplayError): + HarReplayServer(entries=[]) + + +class TestHttpServer(unittest.TestCase): + + def test_serves_recorded_response(self): + server = HarReplayServer(entries=[HarEntry( + method="GET", path="/foo", status=200, + headers={"Content-Type": "application/json"}, + body='{"ok": true}', + )]) + url = server.start() + try: + with urllib.request.urlopen(url + "/foo", timeout=2) as response: # nosec B310 + body = response.read().decode("utf-8") + self.assertEqual(response.status, 200) + self.assertEqual(body, '{"ok": true}') + finally: + server.stop() + + def test_unmatched_returns_404(self): + server = HarReplayServer(entries=[HarEntry(method="GET", path="/foo", status=200)]) + url = server.start() + try: + with self.assertRaises(urllib.error.HTTPError) as ctx: + urllib.request.urlopen(url + "/missing", timeout=2) # nosec B310 + self.assertEqual(ctx.exception.code, 404) + finally: + server.stop() + + +if __name__ == "__main__": + unittest.main() From fb68d4bf6ffa9b503208ce896ba0fe341b05a18e Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:35:25 +0800 Subject: [PATCH 04/50] Add local visual diff review web UI with accept-baseline action --- je_web_runner/utils/visual_review/__init__.py | 14 + .../utils/visual_review/review_server.py | 240 ++++++++++++++++++ test/unit_test/test_visual_review.py | 119 +++++++++ 3 files changed, 373 insertions(+) create mode 100644 je_web_runner/utils/visual_review/__init__.py create mode 100644 je_web_runner/utils/visual_review/review_server.py create mode 100644 test/unit_test/test_visual_review.py diff --git a/je_web_runner/utils/visual_review/__init__.py b/je_web_runner/utils/visual_review/__init__.py new file mode 100644 index 0000000..a2275e5 --- /dev/null +++ b/je_web_runner/utils/visual_review/__init__.py @@ -0,0 +1,14 @@ +"""Local visual-diff review web UI.""" +from je_web_runner.utils.visual_review.review_server import ( + VisualReviewError, + VisualReviewServer, + accept_baseline, + list_diffs, +) + +__all__ = [ + "VisualReviewError", + "VisualReviewServer", + "accept_baseline", + "list_diffs", +] diff --git a/je_web_runner/utils/visual_review/review_server.py b/je_web_runner/utils/visual_review/review_server.py new file mode 100644 index 0000000..1d12cd4 --- /dev/null +++ b/je_web_runner/utils/visual_review/review_server.py @@ -0,0 +1,240 @@ +""" +Visual diff 本機審視 UI:side-by-side baseline / current,一鍵 accept。 +Local visual-diff review server. Walks ``baseline_dir`` / ``current_dir`` +for matching ``*.png`` files and renders an HTML page that places each +pair side-by-side. Clicking *Accept* copies the current PNG over the +baseline. + +Designed to pair with :mod:`visual_regression` outputs. +""" +from __future__ import annotations + +import html as _html +import shutil +import threading +from dataclasses import dataclass, field +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path +from typing import Callable, Dict, List, Optional, Tuple +from urllib.parse import parse_qs, urlparse + +from je_web_runner.utils.exception.exceptions import WebRunnerException +from je_web_runner.utils.logging.loggin_instance import web_runner_logger + + +class VisualReviewError(WebRunnerException): + """Raised when accept / list operations fail.""" + + +@dataclass +class _Pair: + name: str + baseline: Optional[Path] + current: Optional[Path] + status: str # "match" | "diff" | "missing-baseline" | "missing-current" + + +def _pairs(baseline_dir: Path, current_dir: Path) -> List[_Pair]: + baseline_files = {p.name: p for p in baseline_dir.glob("*.png")} if baseline_dir.is_dir() else {} + current_files = {p.name: p for p in current_dir.glob("*.png")} if current_dir.is_dir() else {} + names = sorted(set(baseline_files) | set(current_files)) + pairs: List[_Pair] = [] + for name in names: + baseline = baseline_files.get(name) + current = current_files.get(name) + if baseline and current: + same = baseline.read_bytes() == current.read_bytes() + status = "match" if same else "diff" + elif baseline is None: + status = "missing-baseline" + else: + status = "missing-current" + pairs.append(_Pair(name=name, baseline=baseline, current=current, status=status)) + return pairs + + +def list_diffs(baseline_dir: str, current_dir: str) -> List[Dict[str, str]]: + """Return ``[{name, status}]`` for every paired snapshot.""" + pairs = _pairs(Path(baseline_dir), Path(current_dir)) + return [{"name": p.name, "status": p.status} for p in pairs] + + +def accept_baseline(baseline_dir: str, current_dir: str, name: str) -> Path: + """ + Copy ``current_dir/name`` over ``baseline_dir/name`` (creating dir). + """ + if not name or "/" in name or "\\" in name or name.startswith(".."): + raise VisualReviewError(f"unsafe baseline name: {name!r}") + current = Path(current_dir) / name + baseline_target = Path(baseline_dir) / name + if not current.is_file(): + raise VisualReviewError(f"current file missing: {current}") + baseline_target.parent.mkdir(parents=True, exist_ok=True) + shutil.copyfile(current, baseline_target) + return baseline_target + + +_INDEX_HTML = """ + +WebRunner visual review + + +

Visual review

+

baseline: {baseline}
current: {current}

+ + + {rows} +
NameStatusBaselineCurrentAction
+ +""" + + +def _row_html(pair: _Pair) -> str: + baseline_img = ( + f"baseline" + if pair.baseline else "missing" + ) + current_img = ( + f"current" + if pair.current else "missing" + ) + accept_btn = "" + if pair.status in {"diff", "missing-baseline"} and pair.current is not None: + accept_btn = ( + f"
" + f"" + f"
" + ) + return ( + f"" + f"{_html.escape(pair.name)}" + f"{pair.status}" + f"{baseline_img}" + f"{current_img}" + f"{accept_btn}" + f"" + ) + + +def render_index(baseline_dir: str, current_dir: str) -> str: + pairs = _pairs(Path(baseline_dir), Path(current_dir)) + rows = "".join(_row_html(p) for p in pairs) or "No snapshots" + return _INDEX_HTML.format( + baseline=_html.escape(str(Path(baseline_dir).resolve())), + current=_html.escape(str(Path(current_dir).resolve())), + rows=rows, + ) + + +class VisualReviewServer: + """HTTP server that powers the review UI.""" + + def __init__( + self, + baseline_dir: str, + current_dir: str, + host: str = "127.0.0.1", + port: int = 0, + ) -> None: + self.baseline_dir = baseline_dir + self.current_dir = current_dir + self.host = host + self.port = port + self._server: Optional[HTTPServer] = None + self._thread: Optional[threading.Thread] = None + self.accepted: List[str] = [] + + def start(self) -> str: + if self._server is not None: + raise VisualReviewError("review server already started") + handler = _make_handler(self) + srv = HTTPServer((self.host, self.port), handler) + thread = threading.Thread(target=srv.serve_forever, daemon=True) + thread.start() + self._server = srv + self._thread = thread + self.port = srv.server_address[1] + web_runner_logger.info( + f"visual_review listening on {self.host}:{self.port}" + ) + return f"http://{self.host}:{self.port}" # NOSONAR — local UI + + def stop(self) -> None: + if self._server is not None: + self._server.shutdown() + self._server.server_close() + self._server = None + self._thread = None + + +def _make_handler(server: VisualReviewServer) -> Callable: + + class _ReviewHandler(BaseHTTPRequestHandler): + + def log_message(self, format, *args): # pylint: disable=redefined-builtin + return + + def _send(self, status: int, body: bytes, content_type: str) -> None: + self.send_response(status) + self.send_header("Content-Type", content_type) + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self): # noqa: N802 + parsed = urlparse(self.path) + if parsed.path == "/" or parsed.path == "/index.html": + self._send( + 200, + render_index(server.baseline_dir, server.current_dir).encode("utf-8"), + "text/html; charset=utf-8", + ) + return + if parsed.path.startswith("/img/baseline/") or parsed.path.startswith("/img/current/"): + bucket, _, name = parsed.path[5:].partition("/") # strip "/img/" + base = server.baseline_dir if bucket == "baseline" else server.current_dir + target = (Path(base) / name).resolve() + base_resolved = Path(base).resolve() + try: + target.relative_to(base_resolved) + except ValueError: + self._send(404, b"", "text/plain") + return + if not target.is_file(): + self._send(404, b"", "text/plain") + return + self._send(200, target.read_bytes(), "image/png") + return + self._send(404, b"not found", "text/plain") + + def do_POST(self): # noqa: N802 + if self.path != "/accept": + self._send(404, b"not found", "text/plain") + return + length = int(self.headers.get("Content-Length") or 0) + body = self.rfile.read(length).decode("utf-8") if length else "" + params = parse_qs(body) + names = params.get("name") or [] + if not names: + self._send(400, b"missing name", "text/plain") + return + try: + accept_baseline(server.baseline_dir, server.current_dir, names[0]) + except VisualReviewError as error: + self._send(400, str(error).encode("utf-8"), "text/plain") + return + server.accepted.append(names[0]) + self.send_response(303) + self.send_header("Location", "/") + self.end_headers() + + return _ReviewHandler diff --git a/test/unit_test/test_visual_review.py b/test/unit_test/test_visual_review.py new file mode 100644 index 0000000..9cd9126 --- /dev/null +++ b/test/unit_test/test_visual_review.py @@ -0,0 +1,119 @@ +import tempfile +import unittest +import urllib.parse +import urllib.request +from pathlib import Path + +from je_web_runner.utils.visual_review import ( + VisualReviewError, + VisualReviewServer, + accept_baseline, + list_diffs, +) +from je_web_runner.utils.visual_review.review_server import render_index + + +class TestListDiffs(unittest.TestCase): + + def test_status_for_match_diff_missing(self): + with tempfile.TemporaryDirectory() as tmpdir: + base = Path(tmpdir) / "base" + curr = Path(tmpdir) / "curr" + base.mkdir(); curr.mkdir() + (base / "same.png").write_bytes(b"same") + (curr / "same.png").write_bytes(b"same") + (base / "drift.png").write_bytes(b"a") + (curr / "drift.png").write_bytes(b"b") + (base / "only-baseline.png").write_bytes(b"x") + (curr / "only-current.png").write_bytes(b"y") + statuses = {d["name"]: d["status"] for d in list_diffs(str(base), str(curr))} + self.assertEqual(statuses["same.png"], "match") + self.assertEqual(statuses["drift.png"], "diff") + self.assertEqual(statuses["only-baseline.png"], "missing-current") + self.assertEqual(statuses["only-current.png"], "missing-baseline") + + +class TestAcceptBaseline(unittest.TestCase): + + def test_copies_current_to_baseline(self): + with tempfile.TemporaryDirectory() as tmpdir: + base = Path(tmpdir) / "base" + curr = Path(tmpdir) / "curr" + base.mkdir(); curr.mkdir() + (curr / "x.png").write_bytes(b"new") + target = accept_baseline(str(base), str(curr), "x.png") + self.assertTrue(target.is_file()) + self.assertEqual((base / "x.png").read_bytes(), b"new") + + def test_rejects_path_traversal(self): + with tempfile.TemporaryDirectory() as tmpdir: + base = Path(tmpdir) / "base" + curr = Path(tmpdir) / "curr" + base.mkdir(); curr.mkdir() + with self.assertRaises(VisualReviewError): + accept_baseline(str(base), str(curr), "../escape.png") + + def test_missing_current_raises(self): + with tempfile.TemporaryDirectory() as tmpdir: + base = Path(tmpdir) / "base"; base.mkdir() + curr = Path(tmpdir) / "curr"; curr.mkdir() + with self.assertRaises(VisualReviewError): + accept_baseline(str(base), str(curr), "missing.png") + + +class TestRenderIndex(unittest.TestCase): + + def test_includes_status_classes(self): + with tempfile.TemporaryDirectory() as tmpdir: + base = Path(tmpdir) / "base"; base.mkdir() + curr = Path(tmpdir) / "curr"; curr.mkdir() + (base / "drift.png").write_bytes(b"a") + (curr / "drift.png").write_bytes(b"b") + html = render_index(str(base), str(curr)) + self.assertIn("Visual review", html) + self.assertIn("drift.png", html) + self.assertIn("class='diff'", html) + + +class TestVisualReviewServer(unittest.TestCase): + + def test_index_then_accept(self): + with tempfile.TemporaryDirectory() as tmpdir: + base = Path(tmpdir) / "base"; base.mkdir() + curr = Path(tmpdir) / "curr"; curr.mkdir() + (base / "drift.png").write_bytes(b"a") + (curr / "drift.png").write_bytes(b"b") + server = VisualReviewServer(str(base), str(curr)) + url = server.start() + try: + with urllib.request.urlopen(url + "/", timeout=2) as response: # nosec B310 + body = response.read().decode("utf-8") + self.assertIn("drift.png", body) + # Accept + payload = urllib.parse.urlencode({"name": "drift.png"}).encode("utf-8") + request = urllib.request.Request(url + "/accept", data=payload, method="POST") + request.add_header("Content-Type", "application/x-www-form-urlencoded") + opener = urllib.request.build_opener(urllib.request.HTTPRedirectHandler()) + with opener.open(request, timeout=2): # nosec B310 + pass + self.assertEqual((base / "drift.png").read_bytes(), b"b") + self.assertEqual(server.accepted, ["drift.png"]) + finally: + server.stop() + + def test_unknown_path_404(self): + with tempfile.TemporaryDirectory() as tmpdir: + base = Path(tmpdir) / "base"; base.mkdir() + curr = Path(tmpdir) / "curr"; curr.mkdir() + server = VisualReviewServer(str(base), str(curr)) + url = server.start() + try: + with self.assertRaises(urllib.error.HTTPError) as ctx: + urllib.request.urlopen(url + "/nope", timeout=2) # nosec B310 + self.assertEqual(ctx.exception.code, 404) + finally: + server.stop() + + +if __name__ == "__main__": + unittest.main() From 491ec99c6f582ec0af5600977d5ce2bf41126a2b Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:36:46 +0800 Subject: [PATCH 05/50] Add PII / privacy scanner with email/phone/card/SSN/Taiwan-ID/IPv4 detectors --- je_web_runner/utils/pii_scanner/__init__.py | 9 ++ je_web_runner/utils/pii_scanner/scanner.py | 165 ++++++++++++++++++++ test/unit_test/test_pii_scanner.py | 97 ++++++++++++ 3 files changed, 271 insertions(+) create mode 100644 je_web_runner/utils/pii_scanner/__init__.py create mode 100644 je_web_runner/utils/pii_scanner/scanner.py create mode 100644 test/unit_test/test_pii_scanner.py diff --git a/je_web_runner/utils/pii_scanner/__init__.py b/je_web_runner/utils/pii_scanner/__init__.py new file mode 100644 index 0000000..9f502f9 --- /dev/null +++ b/je_web_runner/utils/pii_scanner/__init__.py @@ -0,0 +1,9 @@ +"""PII / privacy scanner for screenshots OCR text and HAR / network bodies.""" +from je_web_runner.utils.pii_scanner.scanner import ( + PiiFinding, + PiiScannerError, + assert_no_pii, + scan_text, +) + +__all__ = ["PiiFinding", "PiiScannerError", "assert_no_pii", "scan_text"] diff --git a/je_web_runner/utils/pii_scanner/scanner.py b/je_web_runner/utils/pii_scanner/scanner.py new file mode 100644 index 0000000..c292b2f --- /dev/null +++ b/je_web_runner/utils/pii_scanner/scanner.py @@ -0,0 +1,165 @@ +""" +PII / privacy scanner:偵測 email / phone / 信用卡 / SSN / Taiwan ID 等敏感資料。 +PII scanner. Augments :mod:`secrets_scanner` with personal-info detection +on plain text (HAR bodies, OCR'd screenshots, log files). + +Detected categories: + +- ``email`` — RFC-5322-shaped addresses. +- ``phone_e164`` — international ``+CC...`` numbers, 10-15 digits. +- ``credit_card`` — 13-19 digits passing the Luhn checksum. +- ``ssn_us`` — US SSN ``NNN-NN-NNNN``. +- ``taiwan_id`` — 1 letter + 9 digits, with ROC checksum. +- ``ipv4`` — dotted-quad IPv4 addresses. + +Each match returns its category, span, and a redacted preview so the +caller can log without leaking the value. +""" +from __future__ import annotations + +import re +from collections import Counter +from dataclasses import dataclass +from typing import Iterable, List, Optional, Sequence + +from je_web_runner.utils.exception.exceptions import WebRunnerException + + +class PiiScannerError(WebRunnerException): + """Raised when scanning input is invalid or assertion fails.""" + + +@dataclass +class PiiFinding: + category: str + start: int + end: int + redacted: str + + +_EMAIL_RE = re.compile( + r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,24}\b" +) +_PHONE_E164_RE = re.compile(r"\+\d{8,15}\b") +_CARD_RE = re.compile(r"\b(?:\d[ -]?){13,19}\b") +_SSN_RE = re.compile(r"\b(?!000|666)(?!9\d{2})\d{3}-(?!00)\d{2}-(?!0000)\d{4}\b") +_TAIWAN_ID_RE = re.compile(r"\b[A-Z][12]\d{8}\b") +_IPV4_RE = re.compile( + r"\b(?:25[0-5]|2[0-4]\d|[01]?\d?\d)" + r"(?:\.(?:25[0-5]|2[0-4]\d|[01]?\d?\d)){3}\b" +) + + +def _luhn_check(digits: str) -> bool: + digits_only = [int(c) for c in digits if c.isdigit()] + if len(digits_only) < 13 or len(digits_only) > 19: + return False + total = 0 + parity = (len(digits_only) - 2) % 2 + for index, value in enumerate(digits_only): + if index % 2 == parity: + value *= 2 + if value > 9: + value -= 9 + total += value + return total % 10 == 0 + + +_TAIWAN_LETTER_VALUES = { + "A": 10, "B": 11, "C": 12, "D": 13, "E": 14, "F": 15, "G": 16, "H": 17, + "I": 34, "J": 18, "K": 19, "L": 20, "M": 21, "N": 22, "O": 35, "P": 23, + "Q": 24, "R": 25, "S": 26, "T": 27, "U": 28, "V": 29, "W": 32, "X": 30, + "Y": 31, "Z": 33, +} + + +def _taiwan_id_check(value: str) -> bool: + if len(value) != 10 or value[0] not in _TAIWAN_LETTER_VALUES: + return False + head = _TAIWAN_LETTER_VALUES[value[0]] + digits = [head // 10, head % 10] + [int(c) for c in value[1:]] + weights = [1, 9, 8, 7, 6, 5, 4, 3, 2, 1, 1] + total = sum(d * w for d, w in zip(digits, weights)) + return total % 10 == 0 + + +def _redact(value: str) -> str: + if len(value) <= 4: + return "*" * len(value) + return value[:2] + "*" * (len(value) - 4) + value[-2:] + + +def scan_text(text: str, categories: Optional[Sequence[str]] = None) -> List[PiiFinding]: + """ + 對 ``text`` 跑全部或指定的 PII 偵測類別 + Run every (or a filtered subset of) PII detector against ``text``. + """ + if not isinstance(text, str): + raise PiiScannerError("text must be str") + allowed = set(categories) if categories else None + findings: List[PiiFinding] = [] + for category, regex, validator in _DETECTORS: + if allowed is not None and category not in allowed: + continue + for match in regex.finditer(text): + value = match.group(0) + if validator is not None and not validator(value): + continue + findings.append(PiiFinding( + category=category, + start=match.start(), + end=match.end(), + redacted=_redact(value), + )) + findings.sort(key=lambda f: (f.start, f.category)) + return findings + + +_DETECTORS = [ + ("email", _EMAIL_RE, None), + ("phone_e164", _PHONE_E164_RE, None), + ("credit_card", _CARD_RE, _luhn_check), + ("ssn_us", _SSN_RE, None), + ("taiwan_id", _TAIWAN_ID_RE, _taiwan_id_check), + ("ipv4", _IPV4_RE, None), +] + + +def summarise(findings: Iterable[PiiFinding]) -> Counter: + """Count findings by category.""" + return Counter(f.category for f in findings) + + +def assert_no_pii(text: str, categories: Optional[Sequence[str]] = None, + allow_categories: Optional[Sequence[str]] = None) -> None: + """ + 斷言文本中沒有指定類別的 PII;``allow_categories`` 可白名單跳過。 + Raise :class:`PiiScannerError` when any non-allowed category is found. + """ + allow = set(allow_categories or []) + findings = [f for f in scan_text(text, categories=categories) + if f.category not in allow] + if findings: + sample = [ + {"category": f.category, "redacted": f.redacted, "at": f.start} + for f in findings[:5] + ] + raise PiiScannerError(f"{len(findings)} PII finding(s): {sample}") + + +def redact_text(text: str, replacement: str = "[REDACTED]", + categories: Optional[Sequence[str]] = None) -> str: + """Return ``text`` with each PII match replaced by ``replacement``.""" + findings = scan_text(text, categories=categories) + if not findings: + return text + pieces: List[str] = [] + cursor = 0 + for finding in findings: + if finding.start < cursor: + continue # skip overlapping matches + pieces.append(text[cursor:finding.start]) + pieces.append(replacement) + cursor = finding.end + pieces.append(text[cursor:]) + return "".join(pieces) diff --git a/test/unit_test/test_pii_scanner.py b/test/unit_test/test_pii_scanner.py new file mode 100644 index 0000000..c57dc61 --- /dev/null +++ b/test/unit_test/test_pii_scanner.py @@ -0,0 +1,97 @@ +import unittest + +from je_web_runner.utils.pii_scanner import ( + PiiScannerError, + assert_no_pii, + scan_text, +) +from je_web_runner.utils.pii_scanner.scanner import redact_text, summarise + + +class TestScanText(unittest.TestCase): + + def test_email_detected(self): + findings = scan_text("contact alice@example.com today") + self.assertEqual([f.category for f in findings], ["email"]) + + def test_phone_e164(self): + findings = scan_text("call +14155552671 anytime") + self.assertIn("phone_e164", [f.category for f in findings]) + + def test_credit_card_with_luhn(self): + # Visa test number that passes Luhn + findings = scan_text("card 4111 1111 1111 1111 charged") + self.assertIn("credit_card", [f.category for f in findings]) + + def test_credit_card_invalid_luhn_skipped(self): + findings = scan_text("not-a-card 4111 1111 1111 1112") + self.assertNotIn("credit_card", [f.category for f in findings]) + + def test_ssn(self): + findings = scan_text("SSN 123-45-6789 on file") + self.assertIn("ssn_us", [f.category for f in findings]) + + def test_taiwan_id_valid_passes_checksum(self): + # Sample valid ROC ID + findings = scan_text("ID: A123456789") + self.assertIn("taiwan_id", [f.category for f in findings]) + + def test_taiwan_id_invalid_filtered(self): + findings = scan_text("ID: A111111111") + self.assertNotIn("taiwan_id", [f.category for f in findings]) + + def test_ipv4(self): + findings = scan_text("origin 192.168.1.1 last week") + self.assertIn("ipv4", [f.category for f in findings]) + + def test_categories_filter(self): + findings = scan_text( + "alice@example.com 192.168.0.1", + categories=["email"], + ) + self.assertEqual([f.category for f in findings], ["email"]) + + def test_redacted_preview(self): + findings = scan_text("alice@example.com") + self.assertNotIn("alice@example.com", findings[0].redacted) + self.assertTrue(findings[0].redacted.startswith("al")) + + def test_non_string_raises(self): + with self.assertRaises(PiiScannerError): + scan_text(123) # type: ignore[arg-type] + + +class TestAssertAndSummarise(unittest.TestCase): + + def test_assert_no_pii_passes_clean(self): + assert_no_pii("nothing sensitive here") + + def test_assert_no_pii_raises(self): + with self.assertRaises(PiiScannerError): + assert_no_pii("alice@example.com") + + def test_allow_categories_skip(self): + assert_no_pii("alice@example.com", allow_categories=["email"]) + + def test_summarise(self): + counts = summarise(scan_text( + "alice@example.com bob@example.com 192.168.1.1" + )) + self.assertEqual(counts["email"], 2) + self.assertEqual(counts["ipv4"], 1) + + +class TestRedactText(unittest.TestCase): + + def test_replaces_matches(self): + out = redact_text("from alice@example.com on 192.168.0.1") + self.assertNotIn("alice@example.com", out) + self.assertNotIn("192.168.0.1", out) + self.assertIn("[REDACTED]", out) + + def test_clean_text_unchanged(self): + self.assertEqual(redact_text("nothing here"), "nothing here") + + +if __name__ == "__main__": + unittest.main() From 62a873410e8bfad783b1ad192e5718568e678479 Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:37:53 +0800 Subject: [PATCH 06/50] Add test impact analysis (action JSON -> locator/url/template index) --- .../utils/impact_analysis/__init__.py | 14 ++ .../utils/impact_analysis/indexer.py | 123 ++++++++++++++++++ test/unit_test/test_impact_analysis.py | 104 +++++++++++++++ 3 files changed, 241 insertions(+) create mode 100644 je_web_runner/utils/impact_analysis/__init__.py create mode 100644 je_web_runner/utils/impact_analysis/indexer.py create mode 100644 test/unit_test/test_impact_analysis.py diff --git a/je_web_runner/utils/impact_analysis/__init__.py b/je_web_runner/utils/impact_analysis/__init__.py new file mode 100644 index 0000000..b98c1a3 --- /dev/null +++ b/je_web_runner/utils/impact_analysis/__init__.py @@ -0,0 +1,14 @@ +"""Test impact analysis: action JSON files → locator/url/template usage map.""" +from je_web_runner.utils.impact_analysis.indexer import ( + ImpactAnalysisError, + ImpactIndex, + affected_action_files, + build_index, +) + +__all__ = [ + "ImpactAnalysisError", + "ImpactIndex", + "affected_action_files", + "build_index", +] diff --git a/je_web_runner/utils/impact_analysis/indexer.py b/je_web_runner/utils/impact_analysis/indexer.py new file mode 100644 index 0000000..0f83915 --- /dev/null +++ b/je_web_runner/utils/impact_analysis/indexer.py @@ -0,0 +1,123 @@ +""" +Test impact analysis:建立 action JSON 檔對 locator / URL / template 的反查表, +給定變更的元件名/URL,回傳所有受影響的 action JSON 檔。 +Walks every action JSON file under a directory, indexes the +``test_object_name``, ``url``, ``template``, and ``WR_*`` command names +each file uses, then answers "which files reference X?" queries so +diff-aware test selection can go beyond filename matching. +""" +from __future__ import annotations + +import json +from collections import defaultdict +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Set, Union + +from je_web_runner.utils.exception.exceptions import WebRunnerException +from je_web_runner.utils.logging.loggin_instance import web_runner_logger + + +class ImpactAnalysisError(WebRunnerException): + """Raised when an action JSON file is malformed.""" + + +@dataclass +class ImpactIndex: + """Reverse index ``{kind: {token: {file_paths}}}``.""" + + by_locator: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set)) + by_url: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set)) + by_template: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set)) + by_command: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set)) + + def files_for_locator(self, name: str) -> List[str]: + return sorted(self.by_locator.get(name, set())) + + def files_for_url(self, fragment: str) -> List[str]: + return sorted({ + file for url, files in self.by_url.items() + for file in files if fragment in url + }) + + def files_for_template(self, name: str) -> List[str]: + return sorted(self.by_template.get(name, set())) + + def files_for_command(self, command: str) -> List[str]: + return sorted(self.by_command.get(command, set())) + + +_ACTIONS_GLOB = "**/*.json" + + +def build_index(directory: Union[str, Path], glob: str = _ACTIONS_GLOB) -> ImpactIndex: + """ + 走訪 ``directory`` 下所有 action JSON 檔,建立反查表 + Walk ``directory`` for ``*.json`` files and project each one's locators, + URLs, templates, and command names into the returned index. + """ + base = Path(directory) + if not base.is_dir(): + raise ImpactAnalysisError(f"directory missing: {directory!r}") + index = ImpactIndex() + for path in sorted(base.glob(glob)): + if not path.is_file(): + continue + try: + actions = json.loads(path.read_text(encoding="utf-8")) + except ValueError as error: + web_runner_logger.warning(f"impact_analysis skipping {path}: {error}") + continue + if not isinstance(actions, list): + continue + _index_actions(index, str(path), actions) + return index + + +def _index_actions(index: ImpactIndex, file_path: str, actions: List[Any]) -> None: + for action in actions: + if not isinstance(action, list) or not action: + continue + command = str(action[0]) + index.by_command[command].add(file_path) + kwargs = _extract_kwargs(action) + for key, value in kwargs.items(): + if not isinstance(value, str): + continue + if key in {"test_object_name", "element_name"}: + index.by_locator[value].add(file_path) + elif key == "url": + index.by_url[value].add(file_path) + elif key == "template": + index.by_template[value].add(file_path) + + +def _extract_kwargs(action: List[Any]) -> Dict[str, Any]: + if len(action) >= 3 and isinstance(action[2], dict): + return action[2] + if len(action) >= 2 and isinstance(action[1], dict): + return action[1] + return {} + + +def affected_action_files( + index: ImpactIndex, + locators: Optional[Iterable[str]] = None, + urls: Optional[Iterable[str]] = None, + templates: Optional[Iterable[str]] = None, + commands: Optional[Iterable[str]] = None, +) -> List[str]: + """ + Given changed locator/URL/template/command names, return every action + JSON file that touches at least one of them. + """ + affected: Set[str] = set() + for name in locators or []: + affected.update(index.files_for_locator(name)) + for fragment in urls or []: + affected.update(index.files_for_url(fragment)) + for template in templates or []: + affected.update(index.files_for_template(template)) + for command in commands or []: + affected.update(index.files_for_command(command)) + return sorted(affected) diff --git a/test/unit_test/test_impact_analysis.py b/test/unit_test/test_impact_analysis.py new file mode 100644 index 0000000..596bbef --- /dev/null +++ b/test/unit_test/test_impact_analysis.py @@ -0,0 +1,104 @@ +import json +import tempfile +import unittest +from pathlib import Path + +from je_web_runner.utils.impact_analysis import ( + ImpactAnalysisError, + affected_action_files, + build_index, +) + + +def _write_actions(path, actions): + Path(path).write_text(json.dumps(actions), encoding="utf-8") + + +class TestBuildIndex(unittest.TestCase): + + def test_indexes_locators_urls_templates_commands(self): + with tempfile.TemporaryDirectory() as tmpdir: + a = Path(tmpdir) / "a.json" + b = Path(tmpdir) / "b.json" + _write_actions(a, [ + ["WR_to_url", {"url": "https://example.com/login"}], + ["WR_save_test_object", {"test_object_name": "submit_btn", + "object_type": "ID"}], + ["WR_render_template", {"template": "login_basic"}], + ]) + _write_actions(b, [ + ["WR_to_url", {"url": "https://example.com/checkout"}], + ["WR_find_recorded_element", {"element_name": "submit_btn"}], + ]) + index = build_index(tmpdir) + self.assertIn(str(a), index.files_for_locator("submit_btn")) + self.assertIn(str(b), index.files_for_locator("submit_btn")) + self.assertEqual( + index.files_for_url("login"), + [str(a)], + ) + self.assertEqual( + index.files_for_template("login_basic"), + [str(a)], + ) + self.assertIn(str(a), index.files_for_command("WR_to_url")) + self.assertIn(str(b), index.files_for_command("WR_to_url")) + + def test_missing_directory_raises(self): + with self.assertRaises(ImpactAnalysisError): + build_index("does/not/exist") + + def test_invalid_json_skipped(self): + with tempfile.TemporaryDirectory() as tmpdir: + (Path(tmpdir) / "broken.json").write_text("not json", encoding="utf-8") + ok = Path(tmpdir) / "ok.json" + _write_actions(ok, [["WR_quit_all"]]) + index = build_index(tmpdir) + self.assertEqual(index.files_for_command("WR_quit_all"), [str(ok)]) + + +class TestAffectedActionFiles(unittest.TestCase): + + def test_changed_locator_returns_users(self): + with tempfile.TemporaryDirectory() as tmpdir: + a = Path(tmpdir) / "a.json" + _write_actions(a, [["WR_save_test_object", + {"test_object_name": "primary_cta", + "object_type": "CSS_SELECTOR"}]]) + b = Path(tmpdir) / "b.json" + _write_actions(b, [["WR_save_test_object", + {"test_object_name": "footer_link", + "object_type": "CSS_SELECTOR"}]]) + index = build_index(tmpdir) + affected = affected_action_files(index, locators=["primary_cta"]) + self.assertEqual(affected, [str(a)]) + + def test_changed_url_substring(self): + with tempfile.TemporaryDirectory() as tmpdir: + login = Path(tmpdir) / "login.json" + _write_actions(login, [["WR_to_url", {"url": "https://example.com/auth/login"}]]) + checkout = Path(tmpdir) / "checkout.json" + _write_actions(checkout, [["WR_to_url", {"url": "https://example.com/cart"}]]) + index = build_index(tmpdir) + affected = affected_action_files(index, urls=["/auth/"]) + self.assertEqual(affected, [str(login)]) + + def test_multiple_filters_unioned(self): + with tempfile.TemporaryDirectory() as tmpdir: + x = Path(tmpdir) / "x.json" + _write_actions(x, [["WR_render_template", {"template": "login_basic"}]]) + y = Path(tmpdir) / "y.json" + _write_actions(y, [["WR_save_test_object", + {"test_object_name": "footer_link", + "object_type": "ID"}]]) + index = build_index(tmpdir) + affected = affected_action_files( + index, + templates=["login_basic"], + locators=["footer_link"], + ) + self.assertEqual(set(affected), {str(x), str(y)}) + + +if __name__ == "__main__": + unittest.main() From ebf8a4762dc82734c4824f7f128b68fee27c2a9c Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:39:08 +0800 Subject: [PATCH 07/50] Add Action JSON LSP server with command completion + lint diagnostics --- je_web_runner/action_lsp/__init__.py | 8 + je_web_runner/action_lsp/__main__.py | 6 + je_web_runner/action_lsp/server.py | 251 +++++++++++++++++++++++++++ test/unit_test/test_action_lsp.py | 106 +++++++++++ 4 files changed, 371 insertions(+) create mode 100644 je_web_runner/action_lsp/__init__.py create mode 100644 je_web_runner/action_lsp/__main__.py create mode 100644 je_web_runner/action_lsp/server.py create mode 100644 test/unit_test/test_action_lsp.py diff --git a/je_web_runner/action_lsp/__init__.py b/je_web_runner/action_lsp/__init__.py new file mode 100644 index 0000000..7cb8195 --- /dev/null +++ b/je_web_runner/action_lsp/__init__.py @@ -0,0 +1,8 @@ +"""Language Server Protocol implementation for WebRunner action JSON files.""" +from je_web_runner.action_lsp.server import ( + ActionLspError, + ActionLspServer, + serve_stdio, +) + +__all__ = ["ActionLspError", "ActionLspServer", "serve_stdio"] diff --git a/je_web_runner/action_lsp/__main__.py b/je_web_runner/action_lsp/__main__.py new file mode 100644 index 0000000..d256356 --- /dev/null +++ b/je_web_runner/action_lsp/__main__.py @@ -0,0 +1,6 @@ +"""Entry point so ``python -m je_web_runner.action_lsp`` starts the LSP.""" +from je_web_runner.action_lsp.server import serve_stdio + + +if __name__ == "__main__": + serve_stdio() diff --git a/je_web_runner/action_lsp/server.py b/je_web_runner/action_lsp/server.py new file mode 100644 index 0000000..cfd2805 --- /dev/null +++ b/je_web_runner/action_lsp/server.py @@ -0,0 +1,251 @@ +""" +Action JSON LSP server:基於 LSP 3.17 protocol,提供 ``WR_*`` 補全與 lint 診斷。 +Minimal LSP server speaking JSON-RPC 2.0 over stdio with the standard +``Content-Length`` headers. Supports: + +- ``initialize`` / ``initialized`` / ``shutdown`` / ``exit`` +- ``textDocument/didOpen`` / ``didChange`` / ``didClose`` +- ``textDocument/completion`` — suggests every registered ``WR_*`` command +- ``textDocument/publishDiagnostics`` — pushes lint findings on document + open / change + +The action linter and command list are pulled from existing modules so +the LSP stays a thin presentation layer. +""" +from __future__ import annotations + +import json +import sys +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, TextIO + +from je_web_runner.utils.exception.exceptions import WebRunnerException +from je_web_runner.utils.logging.loggin_instance import web_runner_logger + + +class ActionLspError(WebRunnerException): + """Raised when a request can't be parsed or handled.""" + + +@dataclass +class _Document: + uri: str + text: str + version: int = 0 + + +@dataclass +class ActionLspServer: + documents: Dict[str, _Document] = field(default_factory=dict) + initialized: bool = False + _command_names: Optional[List[str]] = field(default=None, init=False, repr=False) + + def command_names(self) -> List[str]: + if self._command_names is None: + try: + from je_web_runner.utils.executor.action_executor import executor + names = sorted(executor.event_dict.keys()) + except Exception as error: # pylint: disable=broad-except + web_runner_logger.warning(f"action_lsp executor unavailable: {error!r}") + names = [] + self._command_names = [n for n in names if isinstance(n, str)] + return self._command_names + + # --- Top-level dispatch ---------------------------------------------- + + def handle(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]: + method = message.get("method") + request_id = message.get("id") + params = message.get("params") or {} + if method == "initialize": + return self._respond(request_id, self._initialize()) + if method == "initialized": + self.initialized = True + return None + if method == "shutdown": + return self._respond(request_id, None) + if method == "exit": + return None + if method == "textDocument/didOpen": + return self._on_did_open(params) + if method == "textDocument/didChange": + return self._on_did_change(params) + if method == "textDocument/didClose": + return self._on_did_close(params) + if method == "textDocument/completion": + return self._respond(request_id, self._completion(params)) + return self._error(request_id, -32601, f"unknown method {method!r}") + + # --- Handlers -------------------------------------------------------- + + def _initialize(self) -> Dict[str, Any]: + return { + "capabilities": { + "textDocumentSync": 1, # full sync + "completionProvider": {"triggerCharacters": ['"', "_"]}, + }, + "serverInfo": {"name": "webrunner-action-lsp", "version": "0.1.0"}, + } + + def _on_did_open(self, params: Dict[str, Any]) -> Dict[str, Any]: + document = params.get("textDocument") or {} + uri = str(document.get("uri", "")) + text = str(document.get("text", "")) + self.documents[uri] = _Document(uri=uri, text=text, version=int(document.get("version", 0))) + return self._diagnostics_notification(uri, text) + + def _on_did_change(self, params: Dict[str, Any]) -> Dict[str, Any]: + document = params.get("textDocument") or {} + uri = str(document.get("uri", "")) + changes = params.get("contentChanges") or [] + if uri not in self.documents: + return self._diagnostics_notification(uri, "") + full_text = self.documents[uri].text + for change in changes: + if isinstance(change, dict) and "text" in change: + full_text = str(change["text"]) + self.documents[uri].text = full_text + self.documents[uri].version = int(document.get("version", 0)) + return self._diagnostics_notification(uri, full_text) + + def _on_did_close(self, params: Dict[str, Any]) -> None: + uri = str((params.get("textDocument") or {}).get("uri", "")) + self.documents.pop(uri, None) + return None + + def _completion(self, params: Dict[str, Any]) -> Dict[str, Any]: + items = [ + { + "label": name, + "kind": 14, # CompletionItemKind.Keyword + "detail": "WebRunner action command", + "insertText": name, + } + for name in self.command_names() + ] + return {"isIncomplete": False, "items": items} + + # --- Diagnostics ----------------------------------------------------- + + def _diagnostics_notification(self, uri: str, text: str) -> Dict[str, Any]: + return { + "jsonrpc": "2.0", + "method": "textDocument/publishDiagnostics", + "params": { + "uri": uri, + "diagnostics": self._lint_diagnostics(text), + }, + } + + def _lint_diagnostics(self, text: str) -> List[Dict[str, Any]]: + if not text.strip(): + return [] + try: + actions = json.loads(text) + except ValueError as error: + return [_diagnostic(error_message=f"JSON parse error: {error}", + line=0, severity=1)] + if not isinstance(actions, list): + return [_diagnostic("Action document root must be a JSON array.", + line=0, severity=1)] + diagnostics: List[Dict[str, Any]] = [] + try: + from je_web_runner.utils.linter.action_linter import lint_action + except Exception: # pylint: disable=broad-except + return diagnostics + for finding in lint_action(actions): + severity = 1 if finding.level == "error" else 2 + diagnostics.append(_diagnostic( + error_message=f"[{finding.rule}] {finding.message}", + line=finding.index, + severity=severity, + )) + return diagnostics + + # --- Helpers --------------------------------------------------------- + + @staticmethod + def _respond(request_id: Any, result: Any) -> Dict[str, Any]: + return {"jsonrpc": "2.0", "id": request_id, "result": result} + + @staticmethod + def _error(request_id: Any, code: int, message: str) -> Dict[str, Any]: + return { + "jsonrpc": "2.0", "id": request_id, + "error": {"code": code, "message": message}, + } + + +def _diagnostic(error_message: str, line: int, severity: int) -> Dict[str, Any]: + return { + "range": { + "start": {"line": max(0, line), "character": 0}, + "end": {"line": max(0, line), "character": 200}, + }, + "severity": severity, + "source": "webrunner-action-lsp", + "message": error_message, + } + + +# --- LSP framing ----------------------------------------------------------- + +_HEADER_TERMINATOR = "\r\n\r\n" + + +def _read_message(stdin: TextIO) -> Optional[Dict[str, Any]]: + headers: Dict[str, str] = {} + while True: + line = stdin.readline() + if line == "": + return None + line = line.rstrip("\r\n") + if not line: + break + if ":" in line: + name, _, value = line.partition(":") + headers[name.strip().lower()] = value.strip() + length_str = headers.get("content-length") + if length_str is None: + return None + try: + length = int(length_str) + except ValueError as error: + raise ActionLspError(f"invalid Content-Length: {error}") from error + body = stdin.read(length) + if not body: + return None + try: + return json.loads(body) + except ValueError as error: + raise ActionLspError(f"body is not JSON: {error}") from error + + +def _write_message(stdout: TextIO, message: Dict[str, Any]) -> None: + body = json.dumps(message, ensure_ascii=False) + stdout.write(f"Content-Length: {len(body.encode('utf-8'))}\r\n\r\n{body}") + stdout.flush() + + +def serve_stdio( + stdin: Optional[TextIO] = None, + stdout: Optional[TextIO] = None, + server: Optional[ActionLspServer] = None, +) -> None: + """Run the LSP loop until stdin EOF or an ``exit`` notification.""" + in_stream = stdin or sys.stdin + out_stream = stdout or sys.stdout + used_server = server or ActionLspServer() + while True: + try: + message = _read_message(in_stream) + except ActionLspError as error: + web_runner_logger.warning(f"action_lsp parse error: {error}") + continue + if message is None: + return + response = used_server.handle(message) + if message.get("method") == "exit": + return + if response is not None: + _write_message(out_stream, response) diff --git a/test/unit_test/test_action_lsp.py b/test/unit_test/test_action_lsp.py new file mode 100644 index 0000000..65d2624 --- /dev/null +++ b/test/unit_test/test_action_lsp.py @@ -0,0 +1,106 @@ +import io +import json +import unittest + +from je_web_runner.action_lsp.server import ( + ActionLspServer, + serve_stdio, +) + + +def _frame(message): + body = json.dumps(message) + return f"Content-Length: {len(body.encode('utf-8'))}\r\n\r\n{body}" + + +class TestActionLspServer(unittest.TestCase): + + def test_initialize_returns_capabilities(self): + server = ActionLspServer() + result = server.handle({"id": 1, "method": "initialize", "params": {}}) + capabilities = result["result"]["capabilities"] + self.assertEqual(capabilities["textDocumentSync"], 1) + self.assertIn("triggerCharacters", capabilities["completionProvider"]) + + def test_did_open_publishes_diagnostics(self): + server = ActionLspServer() + result = server.handle({ + "method": "textDocument/didOpen", + "params": {"textDocument": { + "uri": "file:///x.json", + "text": "not json", + }}, + }) + self.assertEqual(result["method"], "textDocument/publishDiagnostics") + diags = result["params"]["diagnostics"] + self.assertTrue(any("JSON parse error" in d["message"] for d in diags)) + + def test_did_open_clean_array_no_diagnostics(self): + server = ActionLspServer() + result = server.handle({ + "method": "textDocument/didOpen", + "params": {"textDocument": { + "uri": "file:///x.json", + "text": "[]", + }}, + }) + self.assertEqual(result["params"]["diagnostics"], []) + + def test_root_must_be_array(self): + server = ActionLspServer() + result = server.handle({ + "method": "textDocument/didOpen", + "params": {"textDocument": { + "uri": "file:///x.json", + "text": "{}", + }}, + }) + diags = result["params"]["diagnostics"] + self.assertTrue(any("root must be a JSON array" in d["message"] for d in diags)) + + def test_did_change_updates_text(self): + server = ActionLspServer() + server.handle({ + "method": "textDocument/didOpen", + "params": {"textDocument": {"uri": "file:///x.json", "text": "[]"}}, + }) + server.handle({ + "method": "textDocument/didChange", + "params": { + "textDocument": {"uri": "file:///x.json", "version": 2}, + "contentChanges": [{"text": "not json"}], + }, + }) + self.assertEqual(server.documents["file:///x.json"].text, "not json") + + def test_completion_returns_command_names(self): + server = ActionLspServer() + # Stub command list so the test doesn't depend on full executor state + server._command_names = ["WR_quit_all", "WR_to_url"] + result = server.handle({"id": 5, "method": "textDocument/completion", + "params": {}}) + labels = [item["label"] for item in result["result"]["items"]] + self.assertEqual(set(labels), {"WR_quit_all", "WR_to_url"}) + + def test_unknown_method_returns_error(self): + server = ActionLspServer() + result = server.handle({"id": 9, "method": "noSuch"}) + self.assertEqual(result["error"]["code"], -32601) + + +class TestServeStdio(unittest.TestCase): + + def test_round_trip(self): + message_a = _frame({"jsonrpc": "2.0", "id": 1, "method": "initialize", + "params": {}}) + message_b = _frame({"jsonrpc": "2.0", "method": "exit"}) + stdin = io.StringIO(message_a + message_b) + stdout = io.StringIO() + serve_stdio(stdin=stdin, stdout=stdout) + output = stdout.getvalue() + self.assertIn("Content-Length:", output) + self.assertIn('"jsonrpc": "2.0"', output) + + +if __name__ == "__main__": + unittest.main() From 18182f71de8ab2dbca210618a5f2c80a8f6b26ab Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:40:55 +0800 Subject: [PATCH 08/50] Document new wave (BiDi / browser pool / HAR replay / PII / visual review / impact analysis / LSP) --- README.md | 30 ++++++++++ .../extended_features_doc.rst | 60 +++++++++++++++++++ .../extended_features_doc.rst | 42 +++++++++++++ 3 files changed, 132 insertions(+) diff --git a/README.md b/README.md index c2e2c0a..f739257 100644 --- a/README.md +++ b/README.md @@ -628,6 +628,36 @@ serve_stdio(server=server) The server speaks MCP `2024-11-05`: `initialize`, `tools/list`, `tools/call`, `resources/list`, `ping`, `shutdown`. +## Action JSON LSP + +A standard Language Server Protocol implementation for action JSON files: + +```bash +python -m je_web_runner.action_lsp +``` + +`textDocument/completion` returns every registered `WR_*` command; `textDocument/publishDiagnostics` runs the action linter on `didOpen` / `didChange`. Pair with VS Code's *Configure JSON Language Servers* or the JetBrains LSP plugin. + +## Even More Capabilities + +Reliability & dev-loop: + +- **Browser pool** — `browser_pool.BrowserPool(factory, size=4, max_uses=50).warm()`; `with pool.session() as ses: …` removes browser cold-start from local dev. Health check + recycle policy built in. +- **WebDriver BiDi bridge** — `bidi_backend.BidiBridge().subscribe(target, "console", callback)` works against either Selenium 4 BiDi (`driver.script.add_console_message_handler`) or Playwright `page.on(...)`. `register_translator` lets you wire custom event names. + +Determinism & offline runs: + +- **HAR replay server** — `har_replay.HarReplayServer(load_har("recorded.har")).start()` boots a local HTTP server that serves recorded responses; supports literal / glob / `re:` URL matching with rotation across duplicates. Drop-in for staging-API outages. + +Quality / privacy: + +- **PII scanner** — `pii_scanner.scan_text(text)` finds emails, E.164 phones, Luhn-validated credit cards, US SSN, ROC ID, and IPv4. `assert_no_pii(text, allow_categories=...)` for CI gates; `redact_text(text)` returns a sanitised copy. +- **Visual diff review UI** — `visual_review.VisualReviewServer(baseline_dir, current_dir).start()` opens a local web UI showing each baseline / current pair side-by-side with an *Accept current as baseline* button (idempotent file copy with path-traversal guard). + +Test orchestration: + +- **Test impact analysis** — `impact_analysis.build_index("./actions")` walks every action JSON file and projects locator names, URLs, template names, and `WR_*` commands into a reverse index; `affected_action_files(index, locators=["primary_cta"])` answers "which tests touch this?" so diff-aware shards can go beyond filename matching. + ## Browser Internals ```python diff --git a/docs/source/Eng/doc/extended_features/extended_features_doc.rst b/docs/source/Eng/doc/extended_features/extended_features_doc.rst index 5da3df7..e4bdc45 100644 --- a/docs/source/Eng/doc/extended_features/extended_features_doc.rst +++ b/docs/source/Eng/doc/extended_features/extended_features_doc.rst @@ -389,3 +389,63 @@ Default tools registered: ``webrunner_lint_action``, Custom tools register via ``McpServer.register(Tool(...))``; the server implements MCP ``2024-11-05`` (``initialize`` / ``tools/list`` / ``tools/call`` / ``resources/list`` / ``ping`` / ``shutdown``). + +Action JSON LSP +=============== + +.. code-block:: shell + + python -m je_web_runner.action_lsp + +Standard LSP 3.17-shaped server over stdio. ``textDocument/completion`` +suggests every registered ``WR_*`` command; ``textDocument/didOpen`` / +``didChange`` push ``publishDiagnostics`` based on +:func:`linter.action_linter.lint_action`. + +Browser pool / BiDi bridge +========================== + +* ``browser_pool.BrowserPool(factory, size=N).warm()`` / + ``pool.session() as ses`` — pre-warmed browser instances with health + check + recycle policy. +* ``bidi_backend.BidiBridge().subscribe(target, event, callback)`` — + unified BiDi-style event subscription against either Selenium 4 BiDi + (``driver.script.add_console_message_handler``) or Playwright + ``page.on(...)``. ``register_translator`` extends the event list. + +HAR replay server +================= + +* ``har_replay.load_har("recorded.har")`` parses ``log.entries`` from a + HAR file. +* ``HarReplayServer(entries).start()`` boots a local HTTP server that + serves the recorded responses; URL patterns support literal / + ``*`` glob / ``re:`` regex with rotation across duplicates. + +PII scanner & visual review +=========================== + +* ``pii_scanner.scan_text(text)`` finds ``email`` / ``phone_e164`` / + Luhn-checked ``credit_card`` / ``ssn_us`` / checksum-validated + ``taiwan_id`` / ``ipv4``. ``assert_no_pii`` and ``redact_text`` are + the CI gate / sanitiser. +* ``visual_review.VisualReviewServer(baseline_dir, current_dir).start()`` + serves a local web UI with side-by-side images and an *Accept current + as baseline* button (path-traversal guarded). + +Test impact analysis +==================== + +``impact_analysis.build_index("./actions")`` walks every action JSON +file and projects locator names, URLs, template names, and ``WR_*`` +command names into a reverse index. Combine with +``sharding.diff_shard`` for a smarter test selection: + +.. code-block:: python + + from je_web_runner.utils.impact_analysis import ( + affected_action_files, build_index, + ) + + index = build_index("./actions") + to_run = affected_action_files(index, locators=["primary_cta"]) diff --git a/docs/source/Zh/doc/extended_features/extended_features_doc.rst b/docs/source/Zh/doc/extended_features/extended_features_doc.rst index 7033d0d..4f2d484 100644 --- a/docs/source/Zh/doc/extended_features/extended_features_doc.rst +++ b/docs/source/Zh/doc/extended_features/extended_features_doc.rst @@ -270,3 +270,45 @@ MCP server ``webrunner_diff_shard`` / ``webrunner_render_k8s`` / ``webrunner_partition_shard``。可透過 ``McpServer.register(Tool(...))`` 擴充自訂工具,協定版本 ``2024-11-05``。 + +Action JSON LSP +=============== + +.. code-block:: shell + + python -m je_web_runner.action_lsp + +標準 LSP 3.17 stdio server,``textDocument/completion`` 回傳所有已註冊 +``WR_*`` 指令;``textDocument/didOpen`` / ``didChange`` 觸發 +``publishDiagnostics`` 跑 action linter。 + +Browser pool / BiDi bridge +========================== + +* ``browser_pool.BrowserPool`` — 暖機 N 個 browser instance、checkout/ + checkin、健康檢查與最大次數淘汰 +* ``bidi_backend.BidiBridge`` — 跨 Selenium 4 BiDi 與 Playwright 的 + 事件訂閱統一介面,可 ``register_translator`` 擴充 + +HAR replay server +================= + +把 ``har_replay.load_har("recorded.har")`` 載入後給 +``HarReplayServer(entries).start()`` 啟用本機 HTTP server,URL pattern +支援字面 / glob / ``re:`` regex、重複條目自動輪播。 + +PII / Visual review +=================== + +* ``pii_scanner.scan_text`` — email / 電話 / Luhn 驗證信用卡 / SSN / + ROC 身分證號 / IPv4,``assert_no_pii`` 與 ``redact_text`` 配套 +* ``visual_review.VisualReviewServer`` — 本機 web UI side-by-side 顯示 + baseline / current,一鍵 accept + +Test impact analysis +==================== + +``impact_analysis.build_index("./actions")`` 走訪 action JSON 建立 +locator / URL / template / command 反查表; +``affected_action_files(index, locators=["primary_cta"])`` 回傳所有 +參考此 locator 的測試檔,搭配 ``sharding.diff_shard`` 做精準測試選擇。 From 98b1aa13c95fdbe1c5e80ecc74382483d98b554f Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:47:20 +0800 Subject: [PATCH 09/50] Add driver version pinner with local cache (bypasses GitHub rate limit) --- je_web_runner/utils/driver_pin/__init__.py | 16 ++ je_web_runner/utils/driver_pin/pinner.py | 211 +++++++++++++++++++++ test/unit_test/test_driver_pin.py | 163 ++++++++++++++++ 3 files changed, 390 insertions(+) create mode 100644 je_web_runner/utils/driver_pin/__init__.py create mode 100644 je_web_runner/utils/driver_pin/pinner.py create mode 100644 test/unit_test/test_driver_pin.py diff --git a/je_web_runner/utils/driver_pin/__init__.py b/je_web_runner/utils/driver_pin/__init__.py new file mode 100644 index 0000000..b8cc1be --- /dev/null +++ b/je_web_runner/utils/driver_pin/__init__.py @@ -0,0 +1,16 @@ +"""Pin geckodriver / chromedriver versions in a per-repo file to dodge rate limits.""" +from je_web_runner.utils.driver_pin.pinner import ( + DriverPinError, + PinnedDriver, + download_pinned, + load_pinfile, + save_pinfile, +) + +__all__ = [ + "DriverPinError", + "PinnedDriver", + "download_pinned", + "load_pinfile", + "save_pinfile", +] diff --git a/je_web_runner/utils/driver_pin/pinner.py b/je_web_runner/utils/driver_pin/pinner.py new file mode 100644 index 0000000..3e13d02 --- /dev/null +++ b/je_web_runner/utils/driver_pin/pinner.py @@ -0,0 +1,211 @@ +""" +Driver 版本固定:避免 webdriver_manager 每次跑都打 api.github.com。 +Reads / writes ``.webrunner/drivers.json`` describing which geckodriver +or chromedriver version + URL to use, downloads the archive once into a +local cache, and returns the on-disk path so callers can pass it to +``Service(executable_path=...)``. +""" +from __future__ import annotations + +import io +import json +import platform +import ssl +import tarfile +import urllib.request +import zipfile +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +from je_web_runner.utils.exception.exceptions import WebRunnerException +from je_web_runner.utils.logging.loggin_instance import web_runner_logger + + +class DriverPinError(WebRunnerException): + """Raised when a pin file is invalid or download verification fails.""" + + +@dataclass +class PinnedDriver: + name: str # "geckodriver" / "chromedriver" / "msedgedriver" + version: str + url: str # direct download URL (CDN, GitHub release asset, etc.) + archive_format: str # "zip" | "tar.gz" + binary_inside: str # filename inside the archive + platforms: List[str] = field(default_factory=list) + cache_subdir: Optional[str] = None # default: f"{name}/{version}" + + def matches_current_platform(self) -> bool: + if not self.platforms: + return True + marker = current_platform_marker() + return marker in self.platforms + + +def current_platform_marker() -> str: + """Return ``win`` / ``mac-arm64`` / ``mac-x64`` / ``linux`` / ``linux-arm64``.""" + system = platform.system().lower() + arch = platform.machine().lower() + if system == "windows": + return "win" + if system == "darwin": + return "mac-arm64" if arch in {"arm64", "aarch64"} else "mac-x64" + if "arm" in arch or "aarch64" in arch: + return "linux-arm64" + return "linux" + + +def load_pinfile(path: Union[str, Path]) -> List[PinnedDriver]: + fp = Path(path) + if not fp.is_file(): + raise DriverPinError(f"pin file not found: {path!r}") + try: + document = json.loads(fp.read_text(encoding="utf-8")) + except ValueError as error: + raise DriverPinError(f"pin file is not JSON: {error}") from error + drivers = document.get("drivers") + if not isinstance(drivers, list): + raise DriverPinError("pin file missing 'drivers' list") + return [_pin_from_dict(index, entry) for index, entry in enumerate(drivers)] + + +def save_pinfile(path: Union[str, Path], drivers: List[PinnedDriver]) -> Path: + fp = Path(path) + fp.parent.mkdir(parents=True, exist_ok=True) + document = {"drivers": [ + { + "name": d.name, + "version": d.version, + "url": d.url, + "archive_format": d.archive_format, + "binary_inside": d.binary_inside, + "platforms": list(d.platforms), + "cache_subdir": d.cache_subdir, + } + for d in drivers + ]} + fp.write_text( + json.dumps(document, ensure_ascii=False, indent=2, sort_keys=True), + encoding="utf-8", + ) + return fp + + +def _pin_from_dict(index: int, entry: Any) -> PinnedDriver: + if not isinstance(entry, dict): + raise DriverPinError(f"drivers[{index}] must be an object") + for required in ("name", "version", "url", "archive_format", "binary_inside"): + if required not in entry: + raise DriverPinError(f"drivers[{index}] missing {required!r}") + if entry["archive_format"] not in {"zip", "tar.gz"}: + raise DriverPinError( + f"drivers[{index}].archive_format must be zip / tar.gz, got " + f"{entry['archive_format']!r}" + ) + if not (entry["url"].startswith("https://") or entry["url"].startswith("http://")): # NOSONAR — scheme allow-list + raise DriverPinError(f"drivers[{index}].url must be http(s)") + return PinnedDriver( + name=str(entry["name"]), + version=str(entry["version"]), + url=str(entry["url"]), + archive_format=str(entry["archive_format"]), + binary_inside=str(entry["binary_inside"]), + platforms=list(entry.get("platforms") or []), + cache_subdir=entry.get("cache_subdir"), + ) + + +def download_pinned( + pinned: PinnedDriver, + cache_dir: Union[str, Path] = ".webrunner/drivers", + fetch: Optional[Any] = None, +) -> Path: + """ + 確認對應的 driver 已下載並解壓;回傳可執行檔路徑 + Make sure the pinned driver archive has been fetched and extracted into + ``cache_dir`` and return the on-disk path of the binary inside. + + ``fetch`` lets tests inject a synthetic byte loader; when ``None`` the + archive is fetched via :func:`urllib.request.urlopen` over a default + SSL context. + """ + target_dir = Path(cache_dir) / (pinned.cache_subdir or f"{pinned.name}/{pinned.version}") + target_binary = target_dir / pinned.binary_inside + if target_binary.is_file(): + return target_binary + target_dir.mkdir(parents=True, exist_ok=True) + web_runner_logger.info( + f"driver_pin downloading {pinned.name} {pinned.version} from {pinned.url}" + ) + payload = (fetch or _default_fetch)(pinned.url) + if not isinstance(payload, (bytes, bytearray)) or not payload: + raise DriverPinError(f"empty payload for {pinned.url!r}") + _extract_archive(pinned.archive_format, payload, target_dir) + if not target_binary.is_file(): + raise DriverPinError( + f"binary {pinned.binary_inside!r} not found inside archive" + ) + try: + target_binary.chmod(0o755) + except OSError: + pass # Windows raises EBADF on chmod for some FS; binary is still usable + return target_binary + + +def _default_fetch(url: str) -> bytes: + if not (url.startswith("https://") or url.startswith("http://")): # NOSONAR — guarded above + raise DriverPinError(f"refusing non-http(s) url: {url!r}") + ssl_context = ssl.create_default_context() # NOSONAR — Py3.10+ default enforces TLS 1.2+ + with urllib.request.urlopen(url, context=ssl_context, timeout=120) as response: # nosec B310 — scheme validated + return response.read() + + +def _extract_archive(archive_format: str, payload: bytes, target_dir: Path) -> None: + if archive_format == "zip": + with zipfile.ZipFile(io.BytesIO(payload)) as zf: + zf.extractall(target_dir) + return + if archive_format == "tar.gz": + with tarfile.open(fileobj=io.BytesIO(payload), mode="r:gz") as tf: + _safe_extract_tar(tf, target_dir) + return + raise DriverPinError(f"unsupported archive format {archive_format!r}") + + +def _safe_extract_tar(archive: tarfile.TarFile, target_dir: Path) -> None: + base = target_dir.resolve() + for member in archive.getmembers(): + candidate = (target_dir / member.name).resolve() + try: + candidate.relative_to(base) + except ValueError as error: + raise DriverPinError(f"unsafe tar member {member.name!r}") from error + archive.extractall(target_dir) + + +def install_for_browser( + pin_file: Union[str, Path], + browser: str, + cache_dir: Union[str, Path] = ".webrunner/drivers", + fetch: Optional[Any] = None, +) -> Optional[Path]: + """High-level helper: load the pin file, find the entry for ``browser``, + download if needed, and return the on-disk binary path.""" + drivers = load_pinfile(pin_file) + candidates = [ + d for d in drivers + if d.name == _driver_name_for(browser) and d.matches_current_platform() + ] + if not candidates: + return None + return download_pinned(candidates[0], cache_dir=cache_dir, fetch=fetch) + + +def _driver_name_for(browser: str) -> str: + return { + "firefox": "geckodriver", + "chrome": "chromedriver", + "chromium": "chromedriver", + "edge": "msedgedriver", + }.get(browser.lower(), browser.lower()) diff --git a/test/unit_test/test_driver_pin.py b/test/unit_test/test_driver_pin.py new file mode 100644 index 0000000..b17f195 --- /dev/null +++ b/test/unit_test/test_driver_pin.py @@ -0,0 +1,163 @@ +import io +import json +import tempfile +import unittest +import zipfile +from pathlib import Path + +from je_web_runner.utils.driver_pin import ( + DriverPinError, + PinnedDriver, + download_pinned, + load_pinfile, + save_pinfile, +) +from je_web_runner.utils.driver_pin.pinner import ( + install_for_browser, +) + + +def _zip_with(filename, content=b"fake-binary"): + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, "w") as zf: + zf.writestr(filename, content) + return buffer.getvalue() + + +class TestPinFile(unittest.TestCase): + + def test_round_trip(self): + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "drivers.json" + drivers = [PinnedDriver( + name="geckodriver", + version="0.34.0", + url="https://example.com/g.zip", + archive_format="zip", + binary_inside="geckodriver.exe", + platforms=["win"], + )] + save_pinfile(path, drivers) + loaded = load_pinfile(path) + self.assertEqual(len(loaded), 1) + self.assertEqual(loaded[0].version, "0.34.0") + + def test_missing_file(self): + with self.assertRaises(DriverPinError): + load_pinfile("nope.json") + + def test_invalid_archive_format(self): + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "x.json" + path.write_text(json.dumps({"drivers": [{ + "name": "g", "version": "1", "url": "https://x", "archive_format": "rar", + "binary_inside": "g", + }]}), encoding="utf-8") + with self.assertRaises(DriverPinError): + load_pinfile(path) + + def test_non_http_url_rejected(self): + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "x.json" + path.write_text(json.dumps({"drivers": [{ + "name": "g", "version": "1", "url": "ftp://x", "archive_format": "zip", + "binary_inside": "g", + }]}), encoding="utf-8") + with self.assertRaises(DriverPinError): + load_pinfile(path) + + +class TestDownloadPinned(unittest.TestCase): + + def test_uses_cache_when_present(self): + with tempfile.TemporaryDirectory() as tmpdir: + cache_dir = Path(tmpdir) / "cache" + target = cache_dir / "geckodriver/0.34.0/geckodriver.exe" + target.parent.mkdir(parents=True) + target.write_bytes(b"existing") + pinned = PinnedDriver( + name="geckodriver", version="0.34.0", + url="https://example.com/g.zip", + archive_format="zip", + binary_inside="geckodriver.exe", + ) + calls = [] + result = download_pinned( + pinned, cache_dir=cache_dir, + fetch=lambda url: (calls.append(url), b"")[1], + ) + self.assertEqual(result, target) + self.assertEqual(calls, []) # cached, no fetch + + def test_extracts_zip_archive(self): + with tempfile.TemporaryDirectory() as tmpdir: + cache_dir = Path(tmpdir) / "cache" + payload = _zip_with("geckodriver.exe") + pinned = PinnedDriver( + name="geckodriver", version="0.34.0", + url="https://example.com/g.zip", + archive_format="zip", + binary_inside="geckodriver.exe", + ) + result = download_pinned(pinned, cache_dir=cache_dir, + fetch=lambda _url: payload) + self.assertTrue(result.is_file()) + self.assertEqual(result.read_bytes(), b"fake-binary") + + def test_missing_binary_in_archive_raises(self): + with tempfile.TemporaryDirectory() as tmpdir: + cache_dir = Path(tmpdir) / "cache" + payload = _zip_with("not-the-binary.txt") + pinned = PinnedDriver( + name="geckodriver", version="0.34.0", + url="https://example.com/g.zip", + archive_format="zip", + binary_inside="geckodriver.exe", + ) + with self.assertRaises(DriverPinError): + download_pinned(pinned, cache_dir=cache_dir, + fetch=lambda _url: payload) + + def test_empty_payload_raises(self): + with tempfile.TemporaryDirectory() as tmpdir: + cache_dir = Path(tmpdir) / "cache" + pinned = PinnedDriver( + name="g", version="1", url="https://x", archive_format="zip", + binary_inside="g", + ) + with self.assertRaises(DriverPinError): + download_pinned(pinned, cache_dir=cache_dir, + fetch=lambda _url: b"") + + +class TestInstallForBrowser(unittest.TestCase): + + def test_picks_matching_platform(self): + with tempfile.TemporaryDirectory() as tmpdir: + pin_file = Path(tmpdir) / "drivers.json" + payload = _zip_with("geckodriver.exe") + save_pinfile(pin_file, [PinnedDriver( + name="geckodriver", version="0.34.0", + url="https://example.com/g.zip", + archive_format="zip", + binary_inside="geckodriver.exe", + platforms=[], # empty = match-all + )]) + cache_dir = Path(tmpdir) / "cache" + result = install_for_browser( + pin_file, "firefox", + cache_dir=cache_dir, + fetch=lambda _url: payload, + ) + self.assertIsNotNone(result) + self.assertTrue(result.is_file()) + + def test_no_match_returns_none(self): + with tempfile.TemporaryDirectory() as tmpdir: + pin_file = Path(tmpdir) / "drivers.json" + save_pinfile(pin_file, []) + self.assertIsNone(install_for_browser(pin_file, "firefox")) + + +if __name__ == "__main__": + unittest.main() From 36661ac33e033ce35e5c21406c3cac9dce07df20 Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:48:26 +0800 Subject: [PATCH 10/50] Add Selenium -> Playwright migration helper (Python source + action JSON) --- je_web_runner/utils/sel_to_pw/__init__.py | 14 ++ je_web_runner/utils/sel_to_pw/translator.py | 162 ++++++++++++++++++++ test/unit_test/test_sel_to_pw.py | 82 ++++++++++ 3 files changed, 258 insertions(+) create mode 100644 je_web_runner/utils/sel_to_pw/__init__.py create mode 100644 je_web_runner/utils/sel_to_pw/translator.py create mode 100644 test/unit_test/test_sel_to_pw.py diff --git a/je_web_runner/utils/sel_to_pw/__init__.py b/je_web_runner/utils/sel_to_pw/__init__.py new file mode 100644 index 0000000..6dfbacc --- /dev/null +++ b/je_web_runner/utils/sel_to_pw/__init__.py @@ -0,0 +1,14 @@ +"""Static translator: common Selenium API calls -> Playwright equivalents.""" +from je_web_runner.utils.sel_to_pw.translator import ( + SelToPwError, + Translation, + translate_action_list, + translate_python_source, +) + +__all__ = [ + "SelToPwError", + "Translation", + "translate_action_list", + "translate_python_source", +] diff --git a/je_web_runner/utils/sel_to_pw/translator.py b/je_web_runner/utils/sel_to_pw/translator.py new file mode 100644 index 0000000..a09647d --- /dev/null +++ b/je_web_runner/utils/sel_to_pw/translator.py @@ -0,0 +1,162 @@ +""" +Selenium 寫法靜態翻譯成 Playwright:覆蓋常見 60-70% pattern。 +Static (regex-based) translator for the most-used Selenium API calls and +WebRunner action JSON commands. Output is a draft — caller-supplied +review is still required, especially for: + +- chained ActionChains / multi-step waits +- iframe / window switching (Playwright uses ``page.frame_locator``) +- file uploads (``send_keys`` ↔ ``set_input_files``) + +For action JSON the translator rewrites well-known ``WR_*`` commands to +their ``WR_pw_*`` Playwright equivalents. +""" +from __future__ import annotations + +import re +from dataclasses import dataclass +from typing import Any, List, Tuple + +from je_web_runner.utils.exception.exceptions import WebRunnerException + + +class SelToPwError(WebRunnerException): + """Raised on invalid input to the translator.""" + + +@dataclass +class Translation: + line: int + original: str + translated: str + note: str = "" + + +_PYTHON_PATTERNS: List[Tuple[re.Pattern, str, str]] = [ + (re.compile(r"driver\.find_element\(By\.ID,\s*['\"]([^'\"]+)['\"]\)"), + "page.locator('#\\1')", + "ID -> CSS id selector"), + (re.compile(r"driver\.find_element\(By\.CLASS_NAME,\s*['\"]([^'\"]+)['\"]\)"), + "page.locator('.\\1')", + "CLASS_NAME -> CSS class selector"), + (re.compile(r"driver\.find_element\(By\.NAME,\s*['\"]([^'\"]+)['\"]\)"), + "page.locator('[name=\"\\1\"]')", + "NAME -> CSS [name=...]"), + (re.compile(r"driver\.find_element\(By\.CSS_SELECTOR,\s*(['\"][^'\"]+['\"])\)"), + "page.locator(\\1)", + "CSS_SELECTOR -> page.locator()"), + (re.compile(r"driver\.find_element\(By\.XPATH,\s*(['\"][^'\"]+['\"])\)"), + "page.locator(f'xpath=' + \\1)", + "XPATH -> page.locator(xpath=...)"), + (re.compile(r"driver\.find_element\(By\.LINK_TEXT,\s*(['\"][^'\"]+['\"])\)"), + "page.get_by_role('link', name=\\1)", + "LINK_TEXT -> get_by_role('link', name=...)"), + (re.compile(r"driver\.get\((['\"][^'\"]+['\"])\)"), + "page.goto(\\1)", + "driver.get -> page.goto"), + (re.compile(r"driver\.implicitly_wait\(\d+\)"), + "# Playwright auto-waits — drop implicitly_wait()", + "implicit wait removed"), + (re.compile(r"driver\.refresh\(\)"), + "page.reload()", + "refresh -> reload"), + (re.compile(r"driver\.back\(\)"), + "page.go_back()", + "back -> go_back"), + (re.compile(r"driver\.forward\(\)"), + "page.go_forward()", + "forward -> go_forward"), + (re.compile(r"driver\.quit\(\)"), + "page.context.close()", + "driver.quit -> context.close"), + (re.compile(r"\.send_keys\((['\"][^'\"]+['\"])\)"), + ".fill(\\1)", + "send_keys(text) -> fill(text)"), + (re.compile(r"\.send_keys\(Keys\.ENTER\)"), + ".press('Enter')", + "send_keys(Keys.ENTER) -> press('Enter')"), + (re.compile(r"\.click\(\)"), + ".click()", + "click() unchanged"), + (re.compile(r"\.text(?![A-Za-z_])"), + ".inner_text()", + ".text -> .inner_text()"), + (re.compile(r"WebDriverWait\(driver,\s*(\d+)\)\.until\(EC\.visibility_of_element_located"), + "page.wait_for_selector(", + "explicit wait -> wait_for_selector (timeout in ms)"), +] + + +def translate_python_source(source: str) -> List[Translation]: + """Translate Python source line-by-line, returning a Translation per hit.""" + if not isinstance(source, str): + raise SelToPwError("source must be str") + translations: List[Translation] = [] + for line_no, line in enumerate(source.splitlines(), start=1): + translated = line + notes: List[str] = [] + for pattern, replacement, note in _PYTHON_PATTERNS: + new_text = pattern.sub(replacement, translated) + if new_text != translated: + notes.append(note) + translated = new_text + if translated != line: + translations.append(Translation( + line=line_no, + original=line, + translated=translated, + note="; ".join(notes), + )) + return translations + + +_ACTION_COMMAND_MAP = { + "WR_to_url": "WR_pw_to_url", + "WR_element_click": "WR_pw_click", + "WR_element_input": "WR_pw_fill", + "WR_implicitly_wait": None, # drop entirely; Playwright auto-waits + "WR_refresh": "WR_pw_reload", + "WR_back": "WR_pw_go_back", + "WR_forward": "WR_pw_go_forward", + "WR_quit_all": "WR_pw_close_context", + "WR_get_screenshot_as_png": "WR_pw_screenshot_png", + "WR_set_window_size": "WR_pw_set_viewport_size", +} + + +def translate_action_list(actions: List[Any]) -> List[List[Any]]: + """ + 把 ``WR_*`` action 清單翻譯成 Playwright 變體;無對應時保留原本的指令並加註。 + Translate a WebRunner action list. ``WR_implicitly_wait`` is dropped + silently; commands without a registered mapping survive intact so the + output remains a runnable draft. + """ + if not isinstance(actions, list): + raise SelToPwError("actions must be a list") + translated: List[List[Any]] = [] + for action in actions: + if not isinstance(action, list) or not action: + translated.append(action) + continue + command = action[0] + if not isinstance(command, str): + translated.append(action) + continue + if command not in _ACTION_COMMAND_MAP: + translated.append(list(action)) + continue + new_command = _ACTION_COMMAND_MAP[command] + if new_command is None: + continue # drop + new_action = list(action) + new_action[0] = new_command + translated.append(new_action) + return translated + + +def supported_python_patterns() -> List[str]: + return [pat.pattern for pat, _replacement, _note in _PYTHON_PATTERNS] + + +def supported_action_commands() -> List[str]: + return sorted(_ACTION_COMMAND_MAP.keys()) diff --git a/test/unit_test/test_sel_to_pw.py b/test/unit_test/test_sel_to_pw.py new file mode 100644 index 0000000..5745294 --- /dev/null +++ b/test/unit_test/test_sel_to_pw.py @@ -0,0 +1,82 @@ +import unittest + +from je_web_runner.utils.sel_to_pw import ( + SelToPwError, + translate_action_list, + translate_python_source, +) +from je_web_runner.utils.sel_to_pw.translator import ( + supported_action_commands, + supported_python_patterns, +) + + +class TestTranslatePython(unittest.TestCase): + + def test_translates_id_locator(self): + source = "el = driver.find_element(By.ID, 'submit')" + results = translate_python_source(source) + self.assertEqual(len(results), 1) + self.assertIn("page.locator('#submit')", results[0].translated) + + def test_translates_get_to_goto(self): + source = "driver.get('https://example.com')" + results = translate_python_source(source) + self.assertIn("page.goto('https://example.com')", results[0].translated) + + def test_translates_send_keys_to_fill(self): + source = "el.send_keys('hello')" + results = translate_python_source(source) + self.assertIn(".fill('hello')", results[0].translated) + + def test_drops_implicit_wait(self): + source = "driver.implicitly_wait(5)" + results = translate_python_source(source) + self.assertIn("auto-waits", results[0].translated) + + def test_text_property_to_inner_text(self): + source = "value = el.text" + results = translate_python_source(source) + self.assertIn(".inner_text()", results[0].translated) + + def test_unchanged_line_skipped(self): + source = "x = 1" + self.assertEqual(translate_python_source(source), []) + + def test_non_string_raises(self): + with self.assertRaises(SelToPwError): + translate_python_source(b"bytes") # type: ignore[arg-type] + + def test_supported_patterns_list_non_empty(self): + self.assertGreater(len(supported_python_patterns()), 5) + + +class TestTranslateActionList(unittest.TestCase): + + def test_known_command_rewritten(self): + actions = [["WR_to_url", {"url": "https://x.com"}]] + result = translate_action_list(actions) + self.assertEqual(result[0][0], "WR_pw_to_url") + + def test_drops_implicit_wait(self): + actions = [["WR_implicitly_wait", {"time_to_wait": 5}], + ["WR_quit_all"]] + result = translate_action_list(actions) + self.assertEqual(len(result), 1) + self.assertEqual(result[0][0], "WR_pw_close_context") + + def test_unknown_command_passes_through(self): + actions = [["WR_custom_action", {"x": 1}]] + result = translate_action_list(actions) + self.assertEqual(result, [["WR_custom_action", {"x": 1}]]) + + def test_invalid_input_raises(self): + with self.assertRaises(SelToPwError): + translate_action_list("not a list") # type: ignore[arg-type] + + def test_supported_commands_includes_to_url(self): + self.assertIn("WR_to_url", supported_action_commands()) + + +if __name__ == "__main__": + unittest.main() From 3528140e2f4bc200ddda3a50c35f25d0d121ba66 Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Sun, 26 Apr 2026 14:49:30 +0800 Subject: [PATCH 11/50] Add heuristic form auto-fill (label/placeholder/name -> fixture key) --- je_web_runner/utils/form_autofill/__init__.py | 16 ++ je_web_runner/utils/form_autofill/autofill.py | 186 ++++++++++++++++++ test/unit_test/test_form_autofill.py | 107 ++++++++++ 3 files changed, 309 insertions(+) create mode 100644 je_web_runner/utils/form_autofill/__init__.py create mode 100644 je_web_runner/utils/form_autofill/autofill.py create mode 100644 test/unit_test/test_form_autofill.py diff --git a/je_web_runner/utils/form_autofill/__init__.py b/je_web_runner/utils/form_autofill/__init__.py new file mode 100644 index 0000000..3128171 --- /dev/null +++ b/je_web_runner/utils/form_autofill/__init__.py @@ -0,0 +1,16 @@ +"""Heuristic form auto-filler: match fields by label/placeholder/name.""" +from je_web_runner.utils.form_autofill.autofill import ( + FieldMatch, + FormAutoFillError, + classify_field, + match_fields, + plan_fill_actions, +) + +__all__ = [ + "FieldMatch", + "FormAutoFillError", + "classify_field", + "match_fields", + "plan_fill_actions", +] diff --git a/je_web_runner/utils/form_autofill/autofill.py b/je_web_runner/utils/form_autofill/autofill.py new file mode 100644 index 0000000..11b6416 --- /dev/null +++ b/je_web_runner/utils/form_autofill/autofill.py @@ -0,0 +1,186 @@ +""" +Form 自動填值:依 label / placeholder / name / type 推欄位用途,從 fixture dict 一鍵填單。 +Heuristic form auto-fill. Take a list of *form field descriptors* (a thin +projection of an HTML ```` / ``