diff --git a/README.md b/README.md index 1559773..da1f50d 100644 --- a/README.md +++ b/README.md @@ -14,17 +14,43 @@ It's primary value proposition is as a CLI tool that allows users to quickly def Agents are defined through [personalities](personalities/), that receive a [task](taskflows/) to complete given a set of [tools](toolboxes/). -Agents can cooperate to complete sequences of tasks through so-called [Taskflows](taskflows/GRAMMAR.md). +Agents can cooperate to complete sequences of tasks through so-called [taskflows](taskflows/GRAMMAR.md). + +You can find a detailed overview of the taskflow grammar [here](https://github.com/GitHubSecurityLab/seclab-taskflow-agent/blob/main/taskflows/GRAMMAR.md) and example taskflows [here](https://github.com/GitHubSecurityLab/seclab-taskflow-agent/tree/main/taskflows/examples). + +## Use Cases and Examples + +The Seclab Taskflow Agent framework was primarily designed to fit the iterative feedback loop driven work involved in Agentic security research workflows and vulnerability triage tasks. + +Its design philosophy is centered around the belief that a prompt level focus of capturing vulnerability patterns will greatly improve and scale security research results as frontier model capabilities evolve over time. + +While the maintainer himself primarily uses this framework as a code auditing tool it also serves as a more generic swiss army knife for exploring Agentic workflows. For example, the GitHub Security Lab also uses this framework for automated code scanning alert triage. + +The framework includes a [CodeQL](https://codeql.github.com/) MCP server that can be used for Agentic code review, see the [CVE-2023-2283](https://github.com/GitHubSecurityLab/seclab-taskflow-agent/blob/main/taskflows/CVE-2023-2283/CVE-2023-2283.yaml) for an example of how to have an Agent review C code using a CodeQL database. + +Instead of generating CodeQL queries itself, the CodeQL MCP Server is used to provide CodeQL-query based MCP tools that allow an Agent to navigate and explore code. It leverages templated CodeQL queries to provide targeted context for model driven code analysis. ## Requirements Python >= 3.9 or Docker -# Usage +## Configuration + +Provide a GitHub token for an account that is entitled to use GitHub Copilot via the `COPILOT_TOKEN` environment variable. Further configuration is use case dependent, i.e. pending which MCP servers you'd like to use in your taskflows. + +You can set persisting environment variables via an `.env` file in the project root. -Provide a Copilot entitled GitHub PAT via the `COPILOT_TOKEN` environment variable. +Example: -## Source +```sh +# Tokens +COPILOT_TOKEN= +# MCP configs +GITHUB_PERSONAL_ACCESS_TOKEN= +CODEQL_DBS_BASE_PATH="/app/my_data/" +``` + +## Deploying from Source First install the required dependencies: @@ -48,9 +74,11 @@ Example: deploying a Taskflow: python main.py -t example ``` -## Docker +## Deploying from Docker -Alternatively you can deploy the Agent via its Docker image using `docker/run.sh`. +You can deploy the Taskflow Agent via its Docker image using `docker/run.sh`. + +WARNING: the Agent Docker image is _NOT_ intended as a security boundary but strictly a deployment convenience. The image entrypoint is `main.py` and thus it operates the same as invoking the Agent from source directly. @@ -58,30 +86,30 @@ You can find the Docker image for the Seclab Taskflow Agent [here](https://githu Note that this image is based on a public release of the Taskflow Agent, and you will have to mount any custom taskflows, personalities, or prompts into the image for them to be available to the Agent. -See [docker/run.sh](docker/run.sh) for configuration details. +Optional image mount points to supply custom data are configured via the environment: -Example: deploying a Taskflow: +- Custom data via `MY_DATA`, mounts to `/app/my_data` +- Custom personalities via `MY_PERSONALITIES`, mounts to `/app/personalities/my_personalities` +- Custom taskflows via `MY_TASKFLOWS`, mounts to `/app/taskflows/my_taskflows` +- Custom prompts via `MY_PROMPTS`, mounts to `/app/prompts/my_prompts` +- Custom toolboxes via `MY_TOOLBOXES`, mounts to `/app/toolboxes/my_toolboxes` + +See [docker/run.sh](docker/run.sh) for further details. + +Example: deploying a Taskflow (example.yaml): ```sh docker/run.sh -t example ``` -Example: deploying a custom taskflow: +Example: deploying a custom taskflow (custom_taskflow.yaml): ```sh MY_TASKFLOWS=~/my_taskflows docker/run.sh -t custom_taskflow ``` -Available image mount points are: - -- Custom data via `MY_DATA` environment variable -- Custom personalities via `MY_PERSONALITIES` environment variable -- Custom taskflows via `MY_TASKFLOWS` environment variable -- Custom prompts via `MY_PROMPTS` environment variable -- Custom toolboxes via `MY_TOOLBOXES` environment variable - For more advanced scenarios like e.g. making custom MCP server code available, you can alter the run script to mount your custom code into the image and configure your toolboxes to use said code accordingly. -Example: custom MCP server deployment via Docker image: +Example: a custom MCP server deployment via Docker image: ```sh export MY_MCP_SERVERS=./mcp_servers @@ -109,7 +137,7 @@ docker run \ Our default run script makes the Docker socket available to the image, which contains the Docker cli, so 3rd party Docker based stdio MCP servers also function as normal. -Example: a toolbox configuration for the official GitHub MCP Server: +Example: a toolbox configuration using the official GitHub MCP Server via Docker: ```yaml server_params: @@ -120,23 +148,7 @@ server_params: GITHUB_PERSONAL_ACCESS_TOKEN: "{{ env GITHUB_PERSONAL_ACCESS_TOKEN }}" ``` -## Framework Configuration - -Set environment variables via an `.env` file in the project root. - -Example: a persistent Agent configuration with various MCP server environment variables set: - -```sh -# Tokens -COPILOT_TOKEN=... -# Docker config, MY_DATA is mounted to /app/my_data -MY_DATA="/home/user/my_data" -# MCP configs -GITHUB_PERSONAL_ACCESS_TOKEN=... -CODEQL_DBS_BASE_PATH="/app/my_data/" -``` - -# Personalities +## Personalities Core characteristics for a single Agent. Configured through YAML files in `personalities/`. @@ -157,7 +169,7 @@ toolboxes: - echo ``` -# Toolboxes +## Toolboxes MCP servers that provide tools. Configured through YAML files in `toolboxes/`. @@ -174,18 +186,7 @@ server_params: SOME: value ``` -Example sse config: - -```yaml -server_params: - kind: sse - # make sure you .env config the echo server, see echo_sse.py for example - url: http://127.0.0.1:9000/echo - headers: - SomeHeader: "{{ env USER }}" -``` - -# Taskflows +## Taskflows A sequence of interdependent tasks performed by a set of Agents. Configured through a YAML based [grammar](taskflows/GRAMMAR.md) in [taskflows/](taskflows/). @@ -263,6 +264,6 @@ This project is licensed under the terms of the MIT open source license. Please [SUPPORT](./SUPPORT.md) -## Acknowledgement +## Acknowledgements -Security Lab team members @m-y-mo and @p- for contributing heavily to the testing and development of this framework, as well as the rest of the Security Lab team for helpful discussions and use cases. +Security Lab team members [Man Yue Mo](https://github.com/m-y-mo) and [Peter Stockli](https://github.com/p-) for contributing heavily to the testing and development of this framework, as well as the rest of the Security Lab team for helpful discussions and feedback. diff --git a/main.py b/main.py index 84e36b5..6392046 100644 --- a/main.py +++ b/main.py @@ -23,14 +23,12 @@ from typing import Any from shell_utils import shell_tool_call -from mcp_utils import DEFAULT_MCP_CLIENT_SESSION_TIMEOUT, ReconnectingMCPServerStdio, AsyncDebugMCPServerStdio, MCPNamespaceWrap +from mcp_utils import DEFAULT_MCP_CLIENT_SESSION_TIMEOUT, ReconnectingMCPServerStdio, AsyncDebugMCPServerStdio, MCPNamespaceWrap, mcp_client_params, mcp_system_prompt, StreamableMCPThread from render_utils import render_model_output, flush_async_output from env_utils import TmpEnv from yaml_parser import YamlParser from agent import TaskAgent from capi import list_tool_call_models -from mcp_utils import mcp_client_params -from mcp_utils import mcp_system_prompt load_dotenv() @@ -132,27 +130,47 @@ async def deploy_task_agents(agents: dict, # XXX: auto-allow all tools if task is headless by clearing confirms confirms = [] client_session_timeout = client_session_timeout or DEFAULT_MCP_CLIENT_SESSION_TIMEOUT + server_proc = None match params['kind']: + # since we spawn stdio servers each time we do not expect + # new tools to appear over time so cache the tools list case 'stdio': if params.get('reconnecting', False): mcp_server = ReconnectingMCPServerStdio( name=tb, params=params, tool_filter=tool_filter, - client_session_timeout_seconds=client_session_timeout) + client_session_timeout_seconds=client_session_timeout, + cache_tools_list=True) else: mcp_server = MCPServerStdio( name=tb, params=params, tool_filter=tool_filter, - client_session_timeout_seconds=client_session_timeout) + client_session_timeout_seconds=client_session_timeout, + cache_tools_list=True) case 'sse': mcp_server = MCPServerSse( name=tb, params=params, tool_filter=tool_filter, client_session_timeout_seconds=client_session_timeout) - case 'streamable': # XXX: needs testing + case 'streamable': + # check if we need to start this server locally as well + if 'command' in params: + def _print_out(line): + msg = f"Streamable MCP Server stdout: {line}" + logging.info(msg) + #print(msg) + def _print_err(line): + msg = f"Streamable MCP Server stderr: {line}" + logging.info(msg) + #print(msg) + server_proc = StreamableMCPThread(params['command'], + url=params['url'], + env=params['env'], + on_output=_print_out, + on_error=_print_err) mcp_server = MCPServerStreamableHttp( name=tb, params=params, @@ -161,7 +179,7 @@ async def deploy_task_agents(agents: dict, case _: raise ValueError(f"Unsupported MCP transport {params['kind']}") # provide namespace and confirmation control through wrapper class - mcp_servers.append(MCPNamespaceWrap(confirms, mcp_server)) + mcp_servers.append((MCPNamespaceWrap(confirms, mcp_server), server_proc)) # connect mcp servers # https://openai.github.io/openai-agents-python/ref/mcp/server/ @@ -173,22 +191,33 @@ async def mcp_session_task( # connects/cleanups have to happen in the same task # but we also want to use wait_for to set a timeout # so we use a dedicated session task to accomplish both - for server in mcp_servers: + for s in mcp_servers: + server, server_proc = s logging.debug(f"Connecting mcp server: {server._name}") + if server_proc is not None: + server_proc.start() + await server_proc.async_wait_for_connection(poll_interval=0.1) await server.connect() # signal that we're connected connected.set() # wait until we're told to clean up await cleanup.wait() - for server in reversed(mcp_servers): + for s in reversed(mcp_servers): + server, server_proc = s try: logging.debug(f"Starting cleanup for mcp server: {server._name}") await server.cleanup() logging.debug(f"Cleaned up mcp server: {server._name}") + if server_proc is not None: + server_proc.stop() + try: + await asyncio.to_thread(server_proc.join_and_raise) + except Exception as e: + print(f"Streamable mcp server process exception: {e}") except asyncio.CancelledError: logging.error(f"Timeout on cleanup for mcp server: {server._name}") finally: - mcp_servers.remove(server) + mcp_servers.remove(s) except RuntimeError as e: logging.error(f"RuntimeError in mcp session task: {e}") except asyncio.CancelledError as e: @@ -233,12 +262,9 @@ async def mcp_session_task( server_prompts=server_prompts, important_guidelines=important_guidelines) ), - # XXX: should handoffs have handoffs? - # XXX: this would be a recursive chicken/egg problem :P - # XXX: are initial handoff functions still visible to handoff agents in the run? handoffs=[], exclude_from_context=exclude_from_context, - mcp_servers=mcp_servers, + mcp_servers=[s[0] for s in mcp_servers], model=model, model_settings=model_settings, run_hooks=run_hooks, @@ -257,7 +283,7 @@ async def mcp_session_task( instructions=prompt_with_handoff_instructions(system_prompt) if handoffs else system_prompt, handoffs=handoffs, exclude_from_context=exclude_from_context, - mcp_servers=mcp_servers, + mcp_servers=[s[0] for s in mcp_servers], model=model, model_settings=model_settings, run_hooks=run_hooks, diff --git a/mcp_servers/codeql/client.py b/mcp_servers/codeql/client.py index 46e174d..3d3c059 100644 --- a/mcp_servers/codeql/client.py +++ b/mcp_servers/codeql/client.py @@ -38,12 +38,16 @@ def shell_command_to_string(cmd): class CodeQL: def __init__(self, codeql_cli=os.getenv("CODEQL_CLI", default="codeql"), - server_options=["--threads=0", - "--quiet", - "--log-to-stderr"], + server_options=["--threads=0", "--quiet"], log_stderr=False): + self.server_options = server_options.copy() + if log_stderr: + os.makedirs("logs", exist_ok=True) + self.stderr_log = f"logs/codeql_stderr_log.log" + self.server_options.append("--log-to-stderr") + else: + self.stderr_log = os.devnull self.codeql_cli = codeql_cli.split() - self.server_options = server_options self.search_paths = [] self.active_database = None self.active_connection = None @@ -52,8 +56,6 @@ def __init__(self, self.progress_id = 0 # clients can override e.g. the default ql/progressUpdated callback if they wish self.method_handlers = {} - os.makedirs("logs", exist_ok=True) - self.stderr_log = f"logs/codeql_stderr_log.log" if log_stderr else os.devnull # def __del__(self): # self._server_stop() @@ -71,6 +73,9 @@ def _server_start(self): server_cmd += self.server_options self.stderr_log = open(self.stderr_log, 'a') p = subprocess.Popen(self.codeql_cli + server_cmd, + text=True, + bufsize=1, + universal_newlines=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=self.stderr_log) @@ -578,7 +583,7 @@ def run_query(query_path: str | Path, database: Path, progress_callback=None, template_values=None, # keep the query server alive if desired - keep_alive=False, + keep_alive=True, log_stderr=False): result = '' query_path = Path(query_path) @@ -587,30 +592,34 @@ def run_query(query_path: str | Path, database: Path, target_pos = get_query_position(query_path, target) if not target_pos: raise ValueError(f"Could not resolve quick eval target for {target}") - with (QueryServer(database, - keep_alive=keep_alive, - log_stderr=log_stderr) as server, - tempfile.TemporaryDirectory() as base_path): - if callable(progress_callback): - server.method_handlers['ql/progressUpdated'] = progress_callback - bqrs_path = base_path / Path("query.bqrs") - if search_paths: - server.search_paths += search_paths - server._server_run_query_from_path(bqrs_path, query_path, - quick_eval_pos=target_pos, - template_values=template_values) - while server.active_query_id: - time.sleep(WAIT_INTERVAL) - failed, msg = server.active_query_error - if failed: - raise RuntimeError(msg) - match fmt: - case 'json': - result = server._bqrs_to_json(bqrs_path, entities=entities) - case 'csv': - result = server._bqrs_to_csv(bqrs_path, entities=entities) - case 'sarif': - result = server._bqrs_to_sarif(bqrs_path, server._query_info(query_path)) - case _: - raise ValueError("Unsupported output format {fmt}") + try: + with (QueryServer(database, + keep_alive=keep_alive, + log_stderr=log_stderr) as server, + tempfile.TemporaryDirectory() as base_path): + if callable(progress_callback): + server.method_handlers['ql/progressUpdated'] = progress_callback + bqrs_path = base_path / Path("query.bqrs") + if search_paths: + server.search_paths += search_paths + + server._server_run_query_from_path(bqrs_path, query_path, + quick_eval_pos=target_pos, + template_values=template_values) + while server.active_query_id: + time.sleep(WAIT_INTERVAL) + failed, msg = server.active_query_error + if failed: + raise RuntimeError(msg) + match fmt: + case 'json': + result = server._bqrs_to_json(bqrs_path, entities=entities) + case 'csv': + result = server._bqrs_to_csv(bqrs_path, entities=entities) + case 'sarif': + result = server._bqrs_to_sarif(bqrs_path, server._query_info(query_path)) + case _: + raise ValueError("Unsupported output format {fmt}") + except Exception as e: + raise RuntimeError(f"Error in run_query: {e}") from e return result diff --git a/mcp_servers/codeql/mcp_server.py b/mcp_servers/codeql/mcp_server.py index b97b706..0959eaf 100644 --- a/mcp_servers/codeql/mcp_server.py +++ b/mcp_servers/codeql/mcp_server.py @@ -7,7 +7,8 @@ ) from client import run_query, file_from_uri, list_src_files, _debug_log, search_in_src_archive from pydantic import Field -from mcp.server.fastmcp import FastMCP, Context +#from mcp.server.fastmcp import FastMCP, Context +from fastmcp import FastMCP, Context # use FastMCP 2.0 from pathlib import Path import os import csv @@ -91,21 +92,28 @@ def _get_file_contents(db: str | Path, uri: str): db = Path(db) return file_from_uri(uri, db) - def _run_query(query_name: str, database_path: str, language: str, template_values: dict): """Run a CodeQL query and return the results""" - database_path = _resolve_db_path(database_path) + + try: + database_path = _resolve_db_path(database_path) + except RuntimeError: + return json.dumps([f"The database path for {database_path} could not be resolved"]) try: query_path = _resolve_query_path(language, query_name) except RuntimeError: - return json.dumps([f"This query {query_name} is not supported for {language}"]) - csv = run_query(Path(__file__).parent.resolve() / - query_path, - database_path, - fmt='csv', - template_values=template_values, - log_stderr=True) - return _csv_to_json_obj(csv) + return json.dumps([f"The query {query_name} is not supported for language: {language}"]) + try: + csv = run_query(Path(__file__).parent.resolve() / + query_path, + database_path, + fmt='csv', + template_values=template_values, + log_stderr=True) + return _csv_to_json_obj(csv) + except Exception as e: + return json.dumps([f"The query {query_name} encountered an error: {e}"]) + @mcp.tool() @@ -203,4 +211,4 @@ def list_functions(database_path: str = Field(description="The CodeQL database p return _run_query('list_functions', database_path, language, {}) if __name__ == "__main__": - mcp.run() + mcp.run(show_banner=False, transport="http", host="127.0.0.1", port=9999) diff --git a/mcp_servers/echo/echo.py b/mcp_servers/echo/echo.py index c35aacd..e0bb5a6 100644 --- a/mcp_servers/echo/echo.py +++ b/mcp_servers/echo/echo.py @@ -5,7 +5,8 @@ filename='logs/mcp_echo.log', filemode='a' ) -from mcp.server.fastmcp import FastMCP +#from mcp.server.fastmcp import FastMCP +from fastmcp import FastMCP # move to FastMCP 2.0 mcp = FastMCP("Echo") @@ -30,4 +31,4 @@ def echo_prompt(message: str) -> str: return f"Please process this message: {message}" if __name__ == "__main__": - mcp.run() + mcp.run(show_banner=False) diff --git a/mcp_servers/logbook/logbook.py b/mcp_servers/logbook/logbook.py index 8369b4a..199051e 100644 --- a/mcp_servers/logbook/logbook.py +++ b/mcp_servers/logbook/logbook.py @@ -5,7 +5,8 @@ filename='logs/mcp_logbook.log', filemode='a' ) -from mcp.server.fastmcp import FastMCP +#from mcp.server.fastmcp import FastMCP +from fastmcp import FastMCP # move to FastMCP 2.0 import json from pathlib import Path import os @@ -89,4 +90,4 @@ def _logbook_erase(key) -> str: if __name__ == "__main__": - mcp.run() + mcp.run(show_banner=False) diff --git a/mcp_servers/memcache/memcache.py b/mcp_servers/memcache/memcache.py index 02b7469..1c8eac8 100644 --- a/mcp_servers/memcache/memcache.py +++ b/mcp_servers/memcache/memcache.py @@ -5,7 +5,8 @@ filename='logs/mcp_memcache.log', filemode='a' ) -from mcp.server.fastmcp import FastMCP +#from mcp.server.fastmcp import FastMCP +from fastmcp import FastMCP # move to FastMCP 2.0 import json from pathlib import Path import os @@ -65,4 +66,4 @@ def memcache_clear_cache(): return backend.clear_cache() if __name__ == "__main__": - mcp.run() + mcp.run(show_banner=False) diff --git a/mcp_utils.py b/mcp_utils.py index b64116c..3a7507c 100644 --- a/mcp_utils.py +++ b/mcp_utils.py @@ -1,7 +1,15 @@ import logging import asyncio -from threading import Thread +from threading import Thread, Event import json +import subprocess +from typing import Optional, Callable +import shutil +import time +import os +import socket +import signal +from urllib.parse import urlparse from mcp.types import CallToolResult, TextContent from agents.mcp import MCPServerStdio @@ -10,6 +18,125 @@ DEFAULT_MCP_CLIENT_SESSION_TIMEOUT = 120 +# A process management class for running in-process MCP streamable servers +class StreamableMCPThread(Thread): + """Process management for local streamable MCP servers""" + def __init__( + self, + cmd, + url: str = '', + on_output: Optional[Callable[[str], None]] = None, + on_error: Optional[Callable[[str], None]] = None, + poll_interval: float = 0.5, + env: Optional[dict[str, str]] = None + ): + super().__init__(daemon=True) + self.url = url + self.cmd = cmd + self.on_output = on_output + self.on_error = on_error + self.poll_interval = poll_interval + self.env = os.environ.copy() # XXX: potential for environment leak to MCP + self.env.update(env) + self._stop_event = Event() + self.process = None + self.exit_code = None + self.exception: Optional[BaseException] = None + + async def async_wait_for_connection(self, timeout=30.0, poll_interval=0.5): + parsed = urlparse(self.url) + host = parsed.hostname + port = parsed.port + if host is None or port is None: + raise ValueError(f"URL must include a host and port: {self.url}") + deadline = asyncio.get_event_loop().time() + timeout + while True: + try: + reader, writer = await asyncio.open_connection(host, port) + writer.close() + await writer.wait_closed() + return # Success + except (OSError, ConnectionRefusedError): + if asyncio.get_event_loop().time() > deadline: + raise TimeoutError(f"Could not connect to {host}:{port} after {timeout} seconds") + await asyncio.sleep(poll_interval) + + def wait_for_connection(self, timeout=30.0, poll_interval=0.5): + parsed = urlparse(self.url) + host = parsed.hostname + port = parsed.port + if host is None or port is None: + raise ValueError(f"URL must include a host and port: {self.url}") + deadline = time.time() + timeout + while True: + try: + with socket.create_connection((host, port), timeout=2): + return # Success + except OSError: + if time.time() > deadline: + raise TimeoutError(f"Could not connect to {host}:{port} after {timeout} seconds") + time.sleep(poll_interval) + + def run(self): + try: + self.process = subprocess.Popen( + self.cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + universal_newlines=True, + env=self.env + ) + + stdout_thread = Thread(target=self._read_stream, args=(self.process.stdout, self.on_output)) + stderr_thread = Thread(target=self._read_stream, args=(self.process.stderr, self.on_error)) + stdout_thread.start() + stderr_thread.start() + + while self.process.poll() is None and not self._stop_event.is_set(): + time.sleep(self.poll_interval) + + # Process ended or stop requested + if self.process.poll() is None: + self.process.terminate() + self.process.wait() + self.exit_code = self.process.returncode + + stdout_thread.join() + stderr_thread.join() + + # sigterm (-15) is expected + if self.exit_code not in [0, -15]: + self.exception = subprocess.CalledProcessError( + self.exit_code, self.cmd + ) + + except BaseException as e: + self.exception = e + + def _read_stream(self, stream, callback): + if stream is None or callback is None: + return + for line in iter(stream.readline, ''): + callback(line.rstrip('\n')) + stream.close() + + def stop(self): + self._stop_event.set() + if self.process and self.process.poll() is None: + self.process.terminate() + + def is_running(self): + return self.process and self.process.poll() is None + + def join_and_raise(self, timeout: Optional[float] = None): + self.join(timeout) + if self.is_alive(): + raise RuntimeError("Process thread did not exit within timeout.") + if self.exception is not None: + raise self.exception + # used for debugging asyncio event loop issues in mcp stdio servers # lifts the asyncio event loop in use to a dedicated threaded loop class AsyncDebugMCPServerStdio(MCPServerStdio): @@ -90,6 +217,7 @@ async def call_tool(self, *args, **kwargs): return result class MCPNamespaceWrap: + """An MCP client object wrapper that provides us with namespace control""" def __init__(self, confirms, obj): self.confirms = confirms self._obj = obj @@ -179,6 +307,7 @@ def mcp_client_params(available_toolboxes: dict, requested_toolboxes: list): server_params['command'] = available_toolboxes[tb]['server_params'].get('command') server_params['args'] = args server_params['env'] = env + # XXX: SSE is deprecated in the MCP spec, but keep it around for now case 'sse': headers = available_toolboxes[tb]['server_params'].get('headers') # support {{ env SOMETHING }} for header values as well for e.g. tokens @@ -190,6 +319,10 @@ def mcp_client_params(available_toolboxes: dict, requested_toolboxes: list): server_params['url'] = available_toolboxes[tb]['server_params'].get('url') server_params['headers'] = headers server_params['timeout'] = timeout + # for more involved local MCP servers, jsonrpc over stdio seems less than reliable + # as an alternative you can configure local toolboxes to use the streamable transport + # but still be started/stopped on demand similar to stdio mcp servers + # all it requires is a streamable config that also has cmd/args/env set case 'streamable': headers = available_toolboxes[tb]['server_params'].get('headers') # support {{ env SOMETHING }} for header values as well for e.g. tokens @@ -201,6 +334,33 @@ def mcp_client_params(available_toolboxes: dict, requested_toolboxes: list): server_params['url'] = available_toolboxes[tb]['server_params'].get('url') server_params['headers'] = headers server_params['timeout'] = timeout + # if command/args/env is set, we also need to start this MCP server ourselves + # this way we can use the streamable transport for MCP servers that get fussy + # over stdio jsonrpc polling + env = available_toolboxes[tb]['server_params'].get('env') + args = available_toolboxes[tb]['server_params'].get('args') + cmd = available_toolboxes[tb]['server_params'].get('command') + if cmd is not None: + logging.debug(f"Initializing streamable toolbox: {tb}\nargs:\n{args }\nenv:\n{env}\n") + exe = shutil.which(cmd) + if exe is None: + raise FileNotFoundError(f"Could not resolve path to {cmd}") + start_cmd = [exe] + if args is not None and isinstance(args, list): + for i, v in enumerate(args): + args[i] = swap_env(v) + start_cmd += args + server_params['command'] = start_cmd + if env is not None and isinstance(env, dict): + for k, v in dict(env).items(): + try: + env[k] = swap_env(v) + except LookupError as e: + logging.critical(e) + logging.info("Assuming toolbox has default configuration available") + del env[k] + pass + server_params['env'] = env case _: raise ValueError(f"Unsupported MCP transport {kind}") confirms = available_toolboxes[tb].get('confirm', []) diff --git a/requirements.txt b/requirements.txt index 2e8f822..d7adc97 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,12 +2,22 @@ aiofiles==24.1.0 annotated-types==0.7.0 anyio==4.9.0 attrs==25.3.0 +Authlib==1.6.3 certifi==2025.6.15 +cffi==2.0.0 charset-normalizer==3.4.2 click==8.2.1 colorama==0.4.6 +cryptography==45.0.7 +cyclopts==3.24.0 distro==1.9.0 +dnspython==2.8.0 docstring-to-markdown==0.17 +docstring_parser==0.17.0 +docutils==0.22 +email-validator==2.3.0 +exceptiongroup==1.3.0 +fastmcp==2.12.2 griffe==1.7.3 h11==0.16.0 httpcore==1.0.9 @@ -15,21 +25,34 @@ httpx==0.28.1 httpx-sse==0.4.1 idna==3.10 importlib_metadata==8.7.0 +isodate==0.7.2 jedi==0.19.2 jiter==0.10.0 jsonschema==4.24.0 +jsonschema-path==0.3.4 jsonschema-specifications==2025.4.1 +lazy-object-proxy==1.12.0 markdown-it-py==3.0.0 +MarkupSafe==3.0.2 mcp==1.13.1 mdurl==0.1.2 +more-itertools==10.8.0 openai==1.107.0 openai-agents==0.2.11 +openapi-core==0.19.5 +openapi-pydantic==0.5.1 +openapi-schema-validator==0.6.3 +openapi-spec-validator==0.7.2 +parse==1.20.2 parso==0.8.4 +pathable==0.4.4 pluggy==1.6.0 +pycparser==2.23 pydantic==2.11.7 pydantic-settings==2.10.1 pydantic_core==2.33.2 Pygments==2.19.2 +pyperclip==1.9.0 python-dotenv==1.1.1 python-lsp-jsonrpc==1.1.2 python-lsp-server==1.12.2 @@ -37,9 +60,12 @@ python-multipart==0.0.20 PyYAML==6.0.2 referencing==0.36.2 requests==2.32.4 +rfc3339-validator==0.1.4 rich==14.0.0 +rich-rst==1.3.1 rpds-py==0.26.0 shellingham==1.5.4 +six==1.17.0 sniffio==1.3.1 SQLAlchemy==2.0.41 sse-starlette==2.4.1 @@ -52,4 +78,5 @@ typing_extensions==4.14.1 ujson==5.10.0 urllib3==2.5.0 uvicorn==0.35.0 +Werkzeug==3.1.1 zipp==3.23.0 diff --git a/toolboxes/codeql.yaml b/toolboxes/codeql.yaml index 1eedea4..d539a26 100644 --- a/toolboxes/codeql.yaml +++ b/toolboxes/codeql.yaml @@ -1,6 +1,7 @@ server_params: - kind: stdio - reconnecting: true + kind: streamable + url: 'http://localhost:9999/mcp' + # if you set a command/args/env we will also start the server on demand command: python args: ["mcp_servers/codeql/mcp_server.py"] env: