From d257db161c6dab256541cfae18191de7381831e8 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 25 May 2026 02:41:17 +0000 Subject: [PATCH 1/2] Add built-in mcpproxy-listfiles and mcpproxy-getfile tools MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Always-on utility tools registered at startup — no YAML config needed. Gives LLMs read-only access to a configurable files directory (default: .playwright-mcp) so they can retrieve screenshots and snapshots produced by package providers such as the Playwright MCP server. New files builtin_tools.py — list_files + get_file handlers; _safe_resolve prevents path-traversal (../); encoding=auto/text/base64 Changes config.py — FILES_DIR constant (MCPPROXY_FILES_DIR env var) server.py — register_builtin_tools() called at startup before YAML providers; exported for tests Dockerfile — COPY builtin_tools.py README.md — layout, intro, Part 9 tutorial, parameter tables Tests (31 new, 175 total) tests/test_builtin_tools.py — 26 tests: empty dir, file listing, sorted entries, subdirectory drill-down, path-traversal rejection, text/base64/auto encoding, PNG roundtrip, size reporting tests/test_server.py — 5 tests: register_builtin_tools exports, mcp.tool called twice, tool names, getfile requires path https://claude.ai/code/session_016EMphWWMguzwVrfAYvJFSX --- Dockerfile | 2 +- README.md | 71 +++++++- builtin_tools.py | 151 +++++++++++++++++ config.py | 6 + server.py | 87 ++++++++++ tests/test_builtin_tools.py | 327 ++++++++++++++++++++++++++++++++++++ tests/test_server.py | 57 +++++++ 7 files changed, 699 insertions(+), 2 deletions(-) create mode 100644 builtin_tools.py create mode 100644 tests/test_builtin_tools.py diff --git a/Dockerfile b/Dockerfile index 6a0dfa1..1921648 100755 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,7 @@ RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir uv ENV PATH="/root/.local/bin:$PATH" -COPY server.py config.py process_runner.py ./ +COPY server.py config.py process_runner.py builtin_tools.py ./ COPY frontend/ ./frontend/ COPY handlers/ ./handlers/ diff --git a/README.md b/README.md index 0bdc785..9f597be 100755 --- a/README.md +++ b/README.md @@ -18,6 +18,11 @@ Each tool **provider** is a single YAML file under `tools/`. The YAML contains: runs `setup_commands`, then registers each tool automatically — no Python files to maintain separately, no changes to `server.py` needed when adding new tools. +Two **built-in tools** (`mcpproxy-listfiles` and `mcpproxy-getfile`) are always registered +without any YAML config. They give LLMs read-only access to a configurable directory +(default: `.playwright-mcp`) — useful for retrieving screenshots and snapshots produced +by package providers such as Playwright MCP. + ## Ports | Port | Service | @@ -38,6 +43,7 @@ maintain separately, no changes to `server.py` needed when adding new tools. ├── server.py ├── config.py ← shared env-var config (imported by all modules) ├── process_runner.py ← spawns & proxies any stdio MCP subprocess +├── builtin_tools.py ← built-in mcpproxy-listfiles / mcpproxy-getfile tools ├── frontend/ │ └── app.py ← FastAPI UI server (port 8889) ├── .env.example @@ -531,7 +537,8 @@ pip install -r requirements.txt -r requirements-dev.txt pytest tests/ -v ``` -Tests cover `server.py` (pure helpers) and `frontend/app.py` (all API endpoints). +Tests cover `server.py` (pure helpers), `frontend/app.py` (all API endpoints), and +`builtin_tools.py` (file listing and retrieval). CI runs on every push via `.github/workflows/tests.yml`. --- @@ -748,6 +755,68 @@ Write state to a well-known file path and read it on the next call. --- +### Part 9 — reading files produced by package providers + +Package providers (e.g. Playwright MCP) often write files to disk — screenshots (PNG), +accessibility snapshots (JSON), downloaded pages (HTML) — that the LLM would otherwise +have no way to retrieve. + +mcpproxy ships two **built-in utility tools** that are always registered, with no YAML +config file required: + +| Tool | Description | +|---|---| +| `mcpproxy-listfiles` | List files and subdirectories inside the files base directory | +| `mcpproxy-getfile` | Read a file from the files base directory (UTF-8 text or base64) | + +**Default base directory:** `.playwright-mcp` relative to the server's working directory +(i.e. `/app/.playwright-mcp` inside Docker). Override with the `MCPPROXY_FILES_DIR` +environment variable. + +Only files **inside** the base directory are accessible — path-traversal attempts +(`../`) are rejected. + +#### Example workflow with Playwright + +1. Ask the LLM to navigate to a page and take a screenshot via the Playwright MCP provider. +2. Playwright writes `screenshot.png` to `.playwright-mcp/`. +3. Ask the LLM to call `mcpproxy-listfiles` — it returns the file list. +4. Ask the LLM to call `mcpproxy-getfile` with `path="screenshot.png"` — it returns the + PNG as a base64 string that the LLM can describe or pass to a vision model. + +#### `mcpproxy-listfiles` parameters + +| Parameter | Type | Required | Default | Description | +|---|---|---|---|---| +| `path` | string | No | `""` | Subdirectory to list, relative to the base dir. Omit to list the root. | + +Returns an object with `ok`, `base_dir`, `path`, and `entries` (list of `{name, type, size}`). + +#### `mcpproxy-getfile` parameters + +| Parameter | Type | Required | Default | Description | +|---|---|---|---|---| +| `path` | string | **Yes** | — | File path, relative to the base dir. | +| `encoding` | string | No | `"auto"` | `"auto"` tries UTF-8, falls back to base64. `"text"` forces UTF-8. `"base64"` always base64. | + +Returns an object with `ok`, `path`, `size`, `content`, and `encoding`. + +#### Changing the base directory + +```bash +# In docker-compose.override.yml or as -e flag +MCPPROXY_FILES_DIR=/app/data +``` + +Or mount a volume at the target path so files persist across container restarts: + +```yaml +volumes: + - ./playwright-output:/app/.playwright-mcp +``` + +--- + ### YAML provider reference ```yaml diff --git a/builtin_tools.py b/builtin_tools.py new file mode 100644 index 0000000..c1b30a8 --- /dev/null +++ b/builtin_tools.py @@ -0,0 +1,151 @@ +""" +Built-in mcpproxy utility tools — registered automatically at startup, +no YAML config file required. + + mcpproxy-listfiles List files/directories inside the mcpproxy files dir. + mcpproxy-getfile Read a file from the mcpproxy files dir (text or base64). + +The *base directory* defaults to ``.playwright-mcp`` (relative to the server's +working directory) and can be overridden at runtime with the +``MCPPROXY_FILES_DIR`` environment variable. Only files **inside** the base +directory are accessible — path-traversal attempts are rejected. +""" + +import base64 +import os +from pathlib import Path +from typing import Any + + +def _base_dir() -> Path: + """Return the resolved base directory for built-in file access. + + Evaluated on each call so that tests can override MCPPROXY_FILES_DIR + with monkeypatch without restarting the process. + """ + raw = os.environ.get("MCPPROXY_FILES_DIR", ".playwright-mcp") + return Path(raw).resolve() + + +def _safe_resolve(relative: str | None) -> Path: + """Resolve *relative* under the base dir; raise ValueError on traversal.""" + base = _base_dir() + target = (base / (relative or "")).resolve() + # relative_to() raises ValueError if target is not under base + try: + target.relative_to(base) + except ValueError: + raise ValueError( + f"Path '{relative}' is outside the allowed directory '{base}'" + ) + return target + + +# --------------------------------------------------------------------------- +# Tool handlers +# --------------------------------------------------------------------------- + +async def list_files( + context: dict[str, Any], + path: str | None = None, +) -> dict[str, Any]: + """List files and subdirectories at *path* inside the files base directory. + + Returns a JSON object with an ``entries`` list; each entry has ``name``, + ``type`` (``"file"`` or ``"directory"``), and ``size`` (bytes, files only). + If the directory does not exist yet the entries list is empty (not an error). + """ + try: + target = _safe_resolve(path) + base = _base_dir() + if not target.exists(): + return { + "ok": True, + "base_dir": str(base), + "path": path or "", + "entries": [], + } + if not target.is_dir(): + return {"ok": False, "error": f"'{path}' is not a directory"} + entries: list[dict[str, Any]] = [] + for entry in sorted(target.iterdir()): + entries.append( + { + "name": entry.name, + "type": "directory" if entry.is_dir() else "file", + "size": entry.stat().st_size if entry.is_file() else None, + } + ) + return { + "ok": True, + "base_dir": str(base), + "path": path or "", + "entries": entries, + } + except Exception as exc: + return {"ok": False, "error": str(exc)} + + +async def get_file( + context: dict[str, Any], + path: str, + encoding: str = "auto", +) -> dict[str, Any]: + """Read a file from the files base directory. + + *encoding* controls how the content is returned: + ``"auto"`` (default) — try UTF-8; fall back to base64 for binary files. + ``"text"`` — decode as UTF-8; error if the file is binary. + ``"base64"`` — always return base64-encoded bytes (safe for images etc.). + + Returns a JSON object with ``content`` (string), ``encoding`` used, and + ``size`` (bytes). + """ + try: + target = _safe_resolve(path) + if not target.exists(): + return {"ok": False, "error": f"File not found: {path}"} + if not target.is_file(): + return {"ok": False, "error": f"Not a file: {path}"} + + raw = target.read_bytes() + size = len(raw) + + if encoding == "base64": + return { + "ok": True, + "path": path, + "size": size, + "content": base64.b64encode(raw).decode(), + "encoding": "base64", + } + + # encoding == "text" or "auto" + try: + text = raw.decode("utf-8") + return { + "ok": True, + "path": path, + "size": size, + "content": text, + "encoding": "text", + } + except UnicodeDecodeError: + if encoding == "text": + return { + "ok": False, + "error": ( + f"File '{path}' is not valid UTF-8 text. " + "Try encoding='base64'." + ), + } + # "auto" fallback → base64 + return { + "ok": True, + "path": path, + "size": size, + "content": base64.b64encode(raw).decode(), + "encoding": "base64", + } + except Exception as exc: + return {"ok": False, "error": str(exc)} diff --git a/config.py b/config.py index 5ba6f1f..6d0cdb4 100644 --- a/config.py +++ b/config.py @@ -6,6 +6,12 @@ ENV_FILE = Path(os.environ.get("MCP_ENV_FILE", ".env")) SERVER_NAME = os.environ.get("MCP_SERVER_NAME", "local-config-driven-mcp") +# Base directory exposed by the built-in mcpproxy-listfiles / mcpproxy-getfile tools. +# Defaults to .playwright-mcp (relative to the server's working directory) so that +# screenshots and snapshots produced by the Playwright MCP package provider are +# immediately accessible. Override with MCPPROXY_FILES_DIR. +FILES_DIR = Path(os.environ.get("MCPPROXY_FILES_DIR", ".playwright-mcp")) + UI_HOST = os.environ.get("MCP_UI_HOST", "0.0.0.0") UI_PORT = int(os.environ.get("MCP_UI_PORT", "8889")) diff --git a/server.py b/server.py index bb63161..330dbef 100755 --- a/server.py +++ b/server.py @@ -317,10 +317,97 @@ def run_provider_setup(spec: dict[str, Any]) -> None: raise +# --------------------------------------------------------------------------- +# Built-in tools (always available, no YAML config required) +# --------------------------------------------------------------------------- + +def register_builtin_tools() -> None: + """Register the mcpproxy-listfiles and mcpproxy-getfile utility tools. + + These tools expose read-only access to the files directory (default: + ``.playwright-mcp``, override with ``MCPPROXY_FILES_DIR``). They are + always registered regardless of what YAML providers are loaded, giving + LLMs a way to retrieve screenshots, JSON snapshots, and other files + produced by package providers such as the Playwright MCP server. + """ + try: + from builtin_tools import get_file, list_files + + register_tool( + { + "name": "mcpproxy-listfiles", + "description": ( + "List files and directories inside the mcpproxy files directory " + "(default: .playwright-mcp, override with MCPPROXY_FILES_DIR). " + "Use this to discover screenshots, JSON snapshots, and other files " + "produced by package providers such as the Playwright MCP server. " + "Pass a subdirectory path to drill down." + ), + "input_schema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": ( + "Subdirectory to list, relative to the base files directory. " + "Omit or pass an empty string to list the root." + ), + "default": "", + } + }, + "required": [], + }, + }, + list_files, + ) + + register_tool( + { + "name": "mcpproxy-getfile", + "description": ( + "Read the contents of a file from the mcpproxy files directory " + "(default: .playwright-mcp). " + "Returns UTF-8 text for text files (JSON, HTML, Markdown, …) or " + "base64-encoded bytes for binary files (PNG screenshots, …). " + "Use mcpproxy-listfiles first to discover available file paths." + ), + "input_schema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Path to the file, relative to the base files directory.", + }, + "encoding": { + "type": "string", + "description": ( + "How to encode the returned content. " + "'auto' (default) tries UTF-8 and falls back to base64. " + "'text' forces UTF-8 (error on binary). " + "'base64' always returns base64 (safe for images)." + ), + "default": "auto", + }, + }, + "required": ["path"], + }, + }, + get_file, + ) + + print("Registered built-in tools: mcpproxy-listfiles, mcpproxy-getfile") + except Exception as exc: + print(f"register_builtin_tools error: {exc}") + traceback.print_exc() + raise + + # --------------------------------------------------------------------------- # Load all providers at import time # --------------------------------------------------------------------------- +register_builtin_tools() + for provider_spec in load_provider_specs(CONFIG_DIR): register_provider(provider_spec) run_provider_setup(provider_spec) diff --git a/tests/test_builtin_tools.py b/tests/test_builtin_tools.py new file mode 100644 index 0000000..383972f --- /dev/null +++ b/tests/test_builtin_tools.py @@ -0,0 +1,327 @@ +"""Tests for builtin_tools.py — mcpproxy-listfiles and mcpproxy-getfile. + +These tests monkeypatch MCPPROXY_FILES_DIR to a fresh temp directory so +they never touch the real .playwright-mcp directory. +""" +import base64 +import os +from pathlib import Path + +import pytest + +# Lazily import after env-var monkeypatching where needed. +# At module level we import only the helpers that don't read env at import time. + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _ctx() -> dict: + """Minimal context dict (built-in tools don't use context).""" + return {} + + +def _set_base(monkeypatch, path: Path) -> None: + """Override the MCPPROXY_FILES_DIR env var for a single test.""" + monkeypatch.setenv("MCPPROXY_FILES_DIR", str(path)) + + +# --------------------------------------------------------------------------- +# list_files +# --------------------------------------------------------------------------- + +class TestListFiles: + @pytest.mark.asyncio + async def test_missing_base_dir_returns_empty(self, tmp_path: Path, monkeypatch): + """If the base dir does not exist yet, return ok=True with empty entries.""" + _set_base(monkeypatch, tmp_path / "nonexistent") + from builtin_tools import list_files + result = await list_files(_ctx()) + assert result["ok"] is True + assert result["entries"] == [] + + @pytest.mark.asyncio + async def test_empty_base_dir_returns_empty(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + _set_base(monkeypatch, base) + from builtin_tools import list_files + result = await list_files(_ctx()) + assert result["ok"] is True + assert result["entries"] == [] + + @pytest.mark.asyncio + async def test_lists_files_and_dirs(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + (base / "snapshot.json").write_text('{"key": "val"}') + (base / "screenshot.png").write_bytes(b"\x89PNG\r\n") + (base / "subdir").mkdir() + _set_base(monkeypatch, base) + from builtin_tools import list_files + result = await list_files(_ctx()) + assert result["ok"] is True + names = {e["name"] for e in result["entries"]} + assert names == {"snapshot.json", "screenshot.png", "subdir"} + + @pytest.mark.asyncio + async def test_file_has_size(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + content = b"hello world" + (base / "note.txt").write_bytes(content) + _set_base(monkeypatch, base) + from builtin_tools import list_files + result = await list_files(_ctx()) + entry = next(e for e in result["entries"] if e["name"] == "note.txt") + assert entry["type"] == "file" + assert entry["size"] == len(content) + + @pytest.mark.asyncio + async def test_directory_size_is_none(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + (base / "subdir").mkdir() + _set_base(monkeypatch, base) + from builtin_tools import list_files + result = await list_files(_ctx()) + entry = next(e for e in result["entries"] if e["name"] == "subdir") + assert entry["type"] == "directory" + assert entry["size"] is None + + @pytest.mark.asyncio + async def test_list_subdirectory(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + sub = base / "pages" + sub.mkdir(parents=True) + (sub / "page1.json").write_text("{}") + (sub / "page2.json").write_text("{}") + _set_base(monkeypatch, base) + from builtin_tools import list_files + result = await list_files(_ctx(), path="pages") + assert result["ok"] is True + names = {e["name"] for e in result["entries"]} + assert names == {"page1.json", "page2.json"} + + @pytest.mark.asyncio + async def test_entries_sorted_alphabetically(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + for name in ("zzz.txt", "aaa.txt", "mmm.txt"): + (base / name).write_text("x") + _set_base(monkeypatch, base) + from builtin_tools import list_files + result = await list_files(_ctx()) + names = [e["name"] for e in result["entries"]] + assert names == sorted(names) + + @pytest.mark.asyncio + async def test_path_traversal_rejected(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + _set_base(monkeypatch, base) + from builtin_tools import list_files + result = await list_files(_ctx(), path="../../../etc") + assert result["ok"] is False + assert "outside" in result["error"].lower() or "error" in result + + @pytest.mark.asyncio + async def test_path_returns_base_dir_in_result(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + _set_base(monkeypatch, base) + from builtin_tools import list_files + result = await list_files(_ctx()) + assert result["base_dir"] == str(base.resolve()) + + @pytest.mark.asyncio + async def test_list_nonexistent_subdir_returns_empty(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + _set_base(monkeypatch, base) + from builtin_tools import list_files + result = await list_files(_ctx(), path="does_not_exist") + assert result["ok"] is True + assert result["entries"] == [] + + +# --------------------------------------------------------------------------- +# get_file +# --------------------------------------------------------------------------- + +class TestGetFile: + @pytest.mark.asyncio + async def test_read_text_file(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + (base / "data.json").write_text('{"a": 1}', encoding="utf-8") + _set_base(monkeypatch, base) + from builtin_tools import get_file + result = await get_file(_ctx(), path="data.json") + assert result["ok"] is True + assert result["content"] == '{"a": 1}' + assert result["encoding"] == "text" + + @pytest.mark.asyncio + async def test_read_binary_file_auto_falls_back_to_base64( + self, tmp_path: Path, monkeypatch + ): + base = tmp_path / "files" + base.mkdir() + # Write raw bytes that are not valid UTF-8 + raw = bytes(range(256)) + (base / "img.bin").write_bytes(raw) + _set_base(monkeypatch, base) + from builtin_tools import get_file + result = await get_file(_ctx(), path="img.bin") + assert result["ok"] is True + assert result["encoding"] == "base64" + assert base64.b64decode(result["content"]) == raw + + @pytest.mark.asyncio + async def test_explicit_base64_encoding(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + content = b"hello" + (base / "f.txt").write_bytes(content) + _set_base(monkeypatch, base) + from builtin_tools import get_file + result = await get_file(_ctx(), path="f.txt", encoding="base64") + assert result["ok"] is True + assert result["encoding"] == "base64" + assert base64.b64decode(result["content"]) == content + + @pytest.mark.asyncio + async def test_explicit_text_encoding_fails_on_binary( + self, tmp_path: Path, monkeypatch + ): + base = tmp_path / "files" + base.mkdir() + (base / "img.bin").write_bytes(bytes(range(256))) + _set_base(monkeypatch, base) + from builtin_tools import get_file + result = await get_file(_ctx(), path="img.bin", encoding="text") + assert result["ok"] is False + assert "utf-8" in result["error"].lower() or "base64" in result["error"].lower() + + @pytest.mark.asyncio + async def test_file_not_found(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + _set_base(monkeypatch, base) + from builtin_tools import get_file + result = await get_file(_ctx(), path="missing.txt") + assert result["ok"] is False + assert "not found" in result["error"].lower() or "missing" in result["error"].lower() + + @pytest.mark.asyncio + async def test_path_is_directory_returns_error(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + (base / "adir").mkdir(parents=True) + _set_base(monkeypatch, base) + from builtin_tools import get_file + result = await get_file(_ctx(), path="adir") + assert result["ok"] is False + assert "not a file" in result["error"].lower() + + @pytest.mark.asyncio + async def test_path_traversal_rejected(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + _set_base(monkeypatch, base) + from builtin_tools import get_file + result = await get_file(_ctx(), path="../../etc/passwd") + assert result["ok"] is False + assert "outside" in result["error"].lower() + + @pytest.mark.asyncio + async def test_size_reported(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + base.mkdir() + data = b"abc" * 100 + (base / "large.bin").write_bytes(data) + _set_base(monkeypatch, base) + from builtin_tools import get_file + result = await get_file(_ctx(), path="large.bin", encoding="base64") + assert result["ok"] is True + assert result["size"] == len(data) + + @pytest.mark.asyncio + async def test_nested_path(self, tmp_path: Path, monkeypatch): + base = tmp_path / "files" + (base / "a" / "b").mkdir(parents=True) + (base / "a" / "b" / "c.txt").write_text("deep") + _set_base(monkeypatch, base) + from builtin_tools import get_file + result = await get_file(_ctx(), path="a/b/c.txt") + assert result["ok"] is True + assert result["content"] == "deep" + + @pytest.mark.asyncio + async def test_png_screenshot_roundtrip(self, tmp_path: Path, monkeypatch): + """Simulate a Playwright screenshot: PNG magic bytes + arbitrary data.""" + base = tmp_path / "files" + base.mkdir() + # Minimal fake PNG (starts with PNG magic signature) + png_magic = b"\x89PNG\r\n\x1a\n" + bytes(range(100)) + (base / "screenshot.png").write_bytes(png_magic) + _set_base(monkeypatch, base) + from builtin_tools import get_file + result = await get_file(_ctx(), path="screenshot.png") + assert result["ok"] is True + # PNG has non-UTF-8 bytes so auto should pick base64 + assert result["encoding"] == "base64" + assert base64.b64decode(result["content"]) == png_magic + + +# --------------------------------------------------------------------------- +# _safe_resolve edge cases +# --------------------------------------------------------------------------- + +class TestSafeResolve: + def test_none_path_resolves_to_base(self, tmp_path: Path, monkeypatch): + _set_base(monkeypatch, tmp_path / "base") + from builtin_tools import _safe_resolve, _base_dir + monkeypatch.setenv("MCPPROXY_FILES_DIR", str(tmp_path / "base")) + result = _safe_resolve(None) + assert result == _base_dir() + + def test_empty_string_resolves_to_base(self, tmp_path: Path, monkeypatch): + _set_base(monkeypatch, tmp_path / "base") + from builtin_tools import _safe_resolve, _base_dir + result = _safe_resolve("") + assert result == _base_dir() + + def test_valid_subdirectory_allowed(self, tmp_path: Path, monkeypatch): + _set_base(monkeypatch, tmp_path / "base") + from builtin_tools import _safe_resolve, _base_dir + result = _safe_resolve("sub/deep") + expected = (_base_dir() / "sub" / "deep").resolve() + assert result == expected + + def test_traversal_raises(self, tmp_path: Path, monkeypatch): + _set_base(monkeypatch, tmp_path / "base") + from builtin_tools import _safe_resolve + with pytest.raises(ValueError, match="outside"): + _safe_resolve("../secret") + + +# --------------------------------------------------------------------------- +# Integration: tools registered in server.py +# --------------------------------------------------------------------------- + +class TestBuiltinToolsRegistered: + """Verify the built-in tool specs are accepted by register_tool without error.""" + + def test_register_builtin_tools_succeeds(self, monkeypatch): + """register_builtin_tools() should not raise (mcp.tool is already registered).""" + # server.py already called register_builtin_tools() at import time. + # Verify the built-in tool module is importable and correct. + import builtin_tools + assert callable(builtin_tools.list_files) + assert callable(builtin_tools.get_file) + + def test_builtin_tools_exported(self): + from builtin_tools import get_file, list_files, _base_dir, _safe_resolve + assert all(callable(f) for f in (get_file, list_files, _base_dir, _safe_resolve)) diff --git a/tests/test_server.py b/tests/test_server.py index d86ba68..1679b10 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -25,6 +25,7 @@ exec_provider_code, load_provider_specs, redact_secrets, + register_builtin_tools, register_tool, resolve_env_defaults, run_provider_setup, @@ -433,3 +434,59 @@ def test_check_true_passed_to_subprocess(self): run_provider_setup(spec) _, kwargs = mock_run.call_args assert kwargs.get("check") is True + + +# --------------------------------------------------------------------------- +# register_builtin_tools +# --------------------------------------------------------------------------- + +class TestRegisterBuiltinTools: + def test_register_builtin_tools_is_callable(self): + """register_builtin_tools is exported from server and callable.""" + assert callable(register_builtin_tools) + + def test_builtin_tool_handlers_importable(self): + """builtin_tools module exports the required handler functions.""" + from builtin_tools import get_file, list_files + assert callable(list_files) + assert callable(get_file) + + def test_register_builtin_tools_calls_mcp_tool_twice(self): + """register_builtin_tools registers exactly two tools via mcp.tool.""" + tool_calls = [] + + def fake_decorator(**kwargs): + tool_calls.append(kwargs.get("name")) + return lambda fn: fn + + with patch("server.mcp") as mock_mcp: + mock_mcp.tool.side_effect = fake_decorator + register_builtin_tools() + + assert len(tool_calls) == 2 + assert "mcpproxy-listfiles" in tool_calls + assert "mcpproxy-getfile" in tool_calls + + def test_listfiles_tool_spec_has_no_required_fields(self): + """mcpproxy-listfiles 'path' parameter should be optional.""" + captured_specs = [] + + def fake_decorator(**kwargs): + captured_specs.append(kwargs) + return lambda fn: fn + + with patch("server.mcp") as mock_mcp: + mock_mcp.tool.side_effect = fake_decorator + register_builtin_tools() + + # Find the listfiles call — it was the first one registered + names = [s["name"] for s in captured_specs] + assert "mcpproxy-listfiles" in names + + def test_getfile_tool_spec_requires_path(self): + """mcpproxy-getfile should declare 'path' as a required parameter.""" + from builtin_tools import get_file + import inspect + sig = inspect.signature(get_file) + # path has no default → required + assert sig.parameters["path"].default is inspect.Parameter.empty From 5e7486c2d680928c102d3d67a204bcf44eb41251 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 25 May 2026 02:44:35 +0000 Subject: [PATCH 2/2] test_mcp_client.sh: add file listing, retrieval, and summary steps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After the initial tool call + Ollama summary, the script now: Step 10 — calls mcpproxy-listfiles (root of files dir, default .playwright-mcp) and mcpproxy-getfile for every file found. A single Python block handles both MCP calls with proper SSE unwrapping and session-ID headers; arbitrary file names are safe because Python manages the HTTP calls directly. Step 11 — asks Ollama to summarise what was produced: - text files (JSON snapshots, HTML, logs): contents included, truncated at 4000 chars each - binary files (PNG screenshots etc.): name + size noted, base64 payload omitted from prompt - errors: surfaced inline - relates file contents back to the original tool call If no files are found the extra steps are skipped cleanly. https://claude.ai/code/session_016EMphWWMguzwVrfAYvJFSX --- tests/test_mcp_client.sh | 224 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) diff --git a/tests/test_mcp_client.sh b/tests/test_mcp_client.sh index 05d7f2c..1edc45f 100755 --- a/tests/test_mcp_client.sh +++ b/tests/test_mcp_client.sh @@ -616,6 +616,230 @@ except Exception as e: print(f'(could not read Ollama response: {e})', file=sys.stderr) " +sep + +# ── Step 10: List files in the mcpproxy files directory ─────────────────────── + +info "Listing files produced in the mcpproxy files directory" + +FILES_DATA="${TMP_DIR}/files_data.json" + +# One Python block handles all MCP calls (listfiles + one getfile per entry) +# so we avoid bash escaping issues with arbitrary file names. +python3 - "${MCP_URL}" "${SESSION_ID:-}" "$(( _RPC_ID + 1 ))" "${FILES_DATA}" <<'PY' +import json, sys, urllib.request, urllib.error +from pathlib import Path + +mcp_url = sys.argv[1] +session_id = sys.argv[2] +rpc_id = int(sys.argv[3]) +out_path = Path(sys.argv[4]) + + +def _mcp_call(method, params): + global rpc_id + rid = rpc_id + rpc_id += 1 + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json, text/event-stream', + } + if session_id: + headers['Mcp-Session-Id'] = session_id + body = json.dumps( + {'jsonrpc': '2.0', 'id': rid, 'method': method, 'params': params} + ).encode() + req = urllib.request.Request(mcp_url, data=body, headers=headers, method='POST') + try: + with urllib.request.urlopen(req, timeout=120) as resp: + raw = resp.read().decode('utf-8') + except urllib.error.HTTPError as exc: + return {'error': {'code': exc.code, 'message': str(exc)}} + # Unwrap SSE envelope (event: message\ndata: {...}\n\n) + if raw.lstrip().startswith(('event:', 'data:')): + for line in raw.splitlines(): + if line.startswith('data: '): + raw = line[6:].strip() + break + try: + return json.loads(raw) + except Exception: + return {} + + +def _extract(rpc): + """Unwrap a tools/call JSON-RPC response to its payload dict.""" + if 'error' in rpc: + e = rpc['error'] + msg = e.get('message', str(e)) if isinstance(e, dict) else str(e) + return {'ok': False, 'error': msg} + result = rpc.get('result', {}) + content = result.get('content', []) + if not content: + return result if isinstance(result, dict) else {'raw': result} + first = content[0] if isinstance(content, list) else content + if isinstance(first, dict) and first.get('type') == 'text': + try: + return json.loads(first['text']) + except (json.JSONDecodeError, TypeError): + return {'text': first['text']} + return {'content': content} + + +# 1. Call mcpproxy-listfiles (root directory) +listing = _extract(_mcp_call('tools/call', {'name': 'mcpproxy-listfiles', 'arguments': {}})) +entries = listing.get('entries', []) +base_dir = listing.get('base_dir', '.playwright-mcp') + +# 2. Fetch every file entry +files_fetched = [] +for entry in entries: + if entry.get('type') != 'file': + continue + fname = entry['name'] + file_result = _extract( + _mcp_call('tools/call', {'name': 'mcpproxy-getfile', 'arguments': {'path': fname}}) + ) + files_fetched.append({ + 'name': fname, + 'size': entry.get('size'), + 'ok': file_result.get('ok', True), + 'encoding': file_result.get('encoding', 'text'), + 'content': file_result.get('content', ''), + 'error': file_result.get('error', ''), + }) + +out_path.write_text( + json.dumps({ + 'ok': listing.get('ok', True), + 'base_dir': base_dir, + 'entries': entries, + 'files': files_fetched, + }, indent=2, ensure_ascii=False), + encoding='utf-8', +) +PY + +# Display listing + fetch status +printf '\n' +python3 -c " +import json, sys +try: + d = json.load(open('${FILES_DATA}', encoding='utf-8')) + base_dir = d.get('base_dir', '.playwright-mcp') + entries = d.get('entries', []) + files = d.get('files', []) + + if not entries: + print(f' (no files found in {base_dir})') + else: + print(f' Base directory: {base_dir}') + print() + for e in entries: + icon = '📁' if e.get('type') == 'directory' else '📄' + size = f\" ({e['size']} bytes)\" if e.get('size') is not None else '' + print(f\" {icon} {e['name']}{size}\") + print() + for f in files: + if f.get('error'): + print(f\" ✗ {f['name']} — {f['error']}\") + elif f.get('encoding') == 'base64': + print(f\" ✓ {f['name']} [binary, {f.get('size','?')} bytes]\") + else: + preview = f.get('content','')[:80].replace('\n',' ') + ellipsis = '…' if len(f.get('content','')) > 80 else '' + print(f\" ✓ {f['name']} → {preview}{ellipsis}\") +except Exception as e: + print(f' (could not display file data: {e})', file=sys.stderr) +" + +# ── Step 11: Ollama summary of file contents ────────────────────────────────── + +HAS_FILES="$(python3 -c " +import json +try: + d = json.load(open('${FILES_DATA}', encoding='utf-8')) + print('yes' if d.get('files') else 'no') +except Exception: + print('no') +")" + +if [[ "${HAS_FILES}" == "yes" ]]; then + + sep + info "Asking Ollama to summarise the file contents" + + OLLAMA_FILES_PAYLOAD="${TMP_DIR}/ollama_files_payload.json" + OLLAMA_FILES_RESP="${TMP_DIR}/ollama_files_resp.json" + + python3 - "${SELECTED_TOOL}" "${CALL_RESULT}" "${FILES_DATA}" \ + "${OLLAMA_FILES_PAYLOAD}" "${OLLAMA_MODEL}" <<'PY' +import json, sys +from pathlib import Path + +tool = sys.argv[1] +orig_result = Path(sys.argv[2]).read_text(encoding='utf-8') +files_data = json.loads(Path(sys.argv[3]).read_text(encoding='utf-8')) +out = Path(sys.argv[4]) +model = sys.argv[5] + +base_dir = files_data.get('base_dir', '.playwright-mcp') +files = files_data.get('files', []) + +sections = [] +for f in files: + fname = f['name'] + if f.get('error'): + sections.append(f"--- {fname} (error: {f['error']}) ---") + elif f.get('encoding') == 'base64': + sections.append( + f"--- {fname} [binary file, {f.get('size', '?')} bytes — content omitted] ---" + ) + else: + content = f.get('content', '') + if len(content) > 4000: + content = content[:4000] + '\n…[truncated]' + sections.append(f"--- {fname} ---\n{content}") + +files_block = '\n\n'.join(sections) if sections else 'No files retrieved.' + +prompt = ( + f"You just called the MCP tool '{tool}'.\n\n" + f"Original tool result:\n{orig_result}\n\n" + f"The following files were found in the mcpproxy files directory ({base_dir}) " + f"and retrieved for you:\n\n" + f"{files_block}\n\n" + "Please summarise what was produced. For each file, describe its contents or " + "purpose. For binary files (e.g. PNG screenshots), note their presence and size. " + "For text files (JSON snapshots, HTML, logs, etc.), describe what they contain. " + "Relate the files back to the tool call where relevant. Be concise." +) + +out.write_text( + json.dumps({"model": model, "prompt": prompt, "stream": False}), + encoding='utf-8', +) +PY + + curl -fsS --max-time 300 \ + -H 'Content-Type: application/json' \ + --data "@${OLLAMA_FILES_PAYLOAD}" \ + "${OLLAMA_URL}/api/generate" > "${OLLAMA_FILES_RESP}" + + printf '\n' + python3 -c " +import json +try: + r = json.load(open('${OLLAMA_FILES_RESP}', encoding='utf-8')) + print(r.get('response', r)) +except Exception as e: + print(f'(could not read Ollama response: {e})', file=sys.stderr) +" + +else + printf '\n No files found — nothing to summarise.\n' +fi + sep ok "Done." printf '\n'