From b87c72ed1c22d78a5e4b68f3b82f80e42ffe28c3 Mon Sep 17 00:00:00 2001 From: Bas Alberts Date: Mon, 15 Sep 2025 18:34:08 -0400 Subject: [PATCH 1/5] Move CodeQL MCP server to a streamable transport Moves the CodeQL MCP Server to a streamable transport and adds the required process management code to start and stop local streamable servers on demand. --- main.py | 53 +++++++--- mcp_servers/codeql/client.py | 72 +++++++------- mcp_servers/codeql/mcp_server.py | 27 +++--- mcp_servers/echo/echo.py | 5 +- mcp_servers/logbook/logbook.py | 5 +- mcp_servers/memcache/memcache.py | 5 +- mcp_utils.py | 160 ++++++++++++++++++++++++++++++- requirements.txt | 27 ++++++ toolboxes/codeql.yaml | 5 +- 9 files changed, 295 insertions(+), 64 deletions(-) diff --git a/main.py b/main.py index 84e36b5..d9373a8 100644 --- a/main.py +++ b/main.py @@ -23,14 +23,12 @@ from typing import Any from shell_utils import shell_tool_call -from mcp_utils import DEFAULT_MCP_CLIENT_SESSION_TIMEOUT, ReconnectingMCPServerStdio, AsyncDebugMCPServerStdio, MCPNamespaceWrap +from mcp_utils import DEFAULT_MCP_CLIENT_SESSION_TIMEOUT, ReconnectingMCPServerStdio, AsyncDebugMCPServerStdio, MCPNamespaceWrap, mcp_client_params, mcp_system_prompt, StreamableMCPThread from render_utils import render_model_output, flush_async_output from env_utils import TmpEnv from yaml_parser import YamlParser from agent import TaskAgent from capi import list_tool_call_models -from mcp_utils import mcp_client_params -from mcp_utils import mcp_system_prompt load_dotenv() @@ -132,27 +130,47 @@ async def deploy_task_agents(agents: dict, # XXX: auto-allow all tools if task is headless by clearing confirms confirms = [] client_session_timeout = client_session_timeout or DEFAULT_MCP_CLIENT_SESSION_TIMEOUT + server_proc = None match params['kind']: + # since we spawn stdio servers each time we do not expect + # new tools to appear over time so cache the tools list case 'stdio': if params.get('reconnecting', False): mcp_server = ReconnectingMCPServerStdio( name=tb, params=params, tool_filter=tool_filter, - client_session_timeout_seconds=client_session_timeout) + client_session_timeout_seconds=client_session_timeout, + cache_tools_list=True) else: mcp_server = MCPServerStdio( name=tb, params=params, tool_filter=tool_filter, - client_session_timeout_seconds=client_session_timeout) + client_session_timeout_seconds=client_session_timeout, + cache_tools_list=True) case 'sse': mcp_server = MCPServerSse( name=tb, params=params, tool_filter=tool_filter, client_session_timeout_seconds=client_session_timeout) - case 'streamable': # XXX: needs testing + case 'streamable': + # check if we need to start this server locally as well + if 'command' in params: + def _print_out(line): + msg = f"Streamable MCP Server stdout: {line}" + logging.info(msg) + print(msg) + def _print_err(line): + msg = f"Streamable MCP Server stderr: {line}" + logging.info(msg) + print(msg) + server_proc = StreamableMCPThread(params['command'], + url=params['url'], + env=params['env'], + on_output=_print_out, + on_error=_print_err) mcp_server = MCPServerStreamableHttp( name=tb, params=params, @@ -161,7 +179,7 @@ async def deploy_task_agents(agents: dict, case _: raise ValueError(f"Unsupported MCP transport {params['kind']}") # provide namespace and confirmation control through wrapper class - mcp_servers.append(MCPNamespaceWrap(confirms, mcp_server)) + mcp_servers.append((MCPNamespaceWrap(confirms, mcp_server), server_proc)) # connect mcp servers # https://openai.github.io/openai-agents-python/ref/mcp/server/ @@ -173,22 +191,33 @@ async def mcp_session_task( # connects/cleanups have to happen in the same task # but we also want to use wait_for to set a timeout # so we use a dedicated session task to accomplish both - for server in mcp_servers: + for s in mcp_servers: + server, server_proc = s logging.debug(f"Connecting mcp server: {server._name}") + if server_proc is not None: + server_proc.start() + await server_proc.async_wait_for_connection(poll_interval=0.1) await server.connect() # signal that we're connected connected.set() # wait until we're told to clean up await cleanup.wait() - for server in reversed(mcp_servers): + for s in reversed(mcp_servers): + server, server_proc = s try: logging.debug(f"Starting cleanup for mcp server: {server._name}") await server.cleanup() logging.debug(f"Cleaned up mcp server: {server._name}") + if server_proc: + server_proc.stop() + try: + await asyncio.to_thread(server_proc.join_and_raise) + except Exception as e: + print(f"Streamable mcp server process exception: {e}") except asyncio.CancelledError: logging.error(f"Timeout on cleanup for mcp server: {server._name}") finally: - mcp_servers.remove(server) + mcp_servers.remove(s) except RuntimeError as e: logging.error(f"RuntimeError in mcp session task: {e}") except asyncio.CancelledError as e: @@ -238,7 +267,7 @@ async def mcp_session_task( # XXX: are initial handoff functions still visible to handoff agents in the run? handoffs=[], exclude_from_context=exclude_from_context, - mcp_servers=mcp_servers, + mcp_servers=[s[0] for s in mcp_servers], model=model, model_settings=model_settings, run_hooks=run_hooks, @@ -257,7 +286,7 @@ async def mcp_session_task( instructions=prompt_with_handoff_instructions(system_prompt) if handoffs else system_prompt, handoffs=handoffs, exclude_from_context=exclude_from_context, - mcp_servers=mcp_servers, + mcp_servers=[s[0] for s in mcp_servers], model=model, model_settings=model_settings, run_hooks=run_hooks, diff --git a/mcp_servers/codeql/client.py b/mcp_servers/codeql/client.py index 46e174d..74afbb0 100644 --- a/mcp_servers/codeql/client.py +++ b/mcp_servers/codeql/client.py @@ -38,12 +38,16 @@ def shell_command_to_string(cmd): class CodeQL: def __init__(self, codeql_cli=os.getenv("CODEQL_CLI", default="codeql"), - server_options=["--threads=0", - "--quiet", - "--log-to-stderr"], + server_options=["--threads=0", "--quiet"], log_stderr=False): + self.server_options = server_options.copy() + if log_stderr: + os.makedirs("logs", exist_ok=True) + self.stderr_log = f"logs/codeql_stderr_log.log" + self.server_options.append("--log-to-stderr") + else: + self.stderr_log = os.devnull self.codeql_cli = codeql_cli.split() - self.server_options = server_options self.search_paths = [] self.active_database = None self.active_connection = None @@ -52,8 +56,6 @@ def __init__(self, self.progress_id = 0 # clients can override e.g. the default ql/progressUpdated callback if they wish self.method_handlers = {} - os.makedirs("logs", exist_ok=True) - self.stderr_log = f"logs/codeql_stderr_log.log" if log_stderr else os.devnull # def __del__(self): # self._server_stop() @@ -75,6 +77,9 @@ def _server_start(self): stdout=subprocess.PIPE, stderr=self.stderr_log) + # XXX: should we give codeql query server some time to finish initializing ? + # XXX: because the query server process is silent we can not just poll for some standard banner + # set some default callbacks for common notifications def _handle_ql_progressUpdated(params): print(f">> Progress: {params.get('step')}/{params.get('maxStep')} status: {params.get('message')}") @@ -587,30 +592,33 @@ def run_query(query_path: str | Path, database: Path, target_pos = get_query_position(query_path, target) if not target_pos: raise ValueError(f"Could not resolve quick eval target for {target}") - with (QueryServer(database, - keep_alive=keep_alive, - log_stderr=log_stderr) as server, - tempfile.TemporaryDirectory() as base_path): - if callable(progress_callback): - server.method_handlers['ql/progressUpdated'] = progress_callback - bqrs_path = base_path / Path("query.bqrs") - if search_paths: - server.search_paths += search_paths - server._server_run_query_from_path(bqrs_path, query_path, - quick_eval_pos=target_pos, - template_values=template_values) - while server.active_query_id: - time.sleep(WAIT_INTERVAL) - failed, msg = server.active_query_error - if failed: - raise RuntimeError(msg) - match fmt: - case 'json': - result = server._bqrs_to_json(bqrs_path, entities=entities) - case 'csv': - result = server._bqrs_to_csv(bqrs_path, entities=entities) - case 'sarif': - result = server._bqrs_to_sarif(bqrs_path, server._query_info(query_path)) - case _: - raise ValueError("Unsupported output format {fmt}") + try: + with (QueryServer(database, + keep_alive=keep_alive, + log_stderr=log_stderr) as server, + tempfile.TemporaryDirectory() as base_path): + if callable(progress_callback): + server.method_handlers['ql/progressUpdated'] = progress_callback + bqrs_path = base_path / Path("query.bqrs") + if search_paths: + server.search_paths += search_paths + server._server_run_query_from_path(bqrs_path, query_path, + quick_eval_pos=target_pos, + template_values=template_values) + while server.active_query_id: + time.sleep(WAIT_INTERVAL) + failed, msg = server.active_query_error + if failed: + raise RuntimeError(msg) + match fmt: + case 'json': + result = server._bqrs_to_json(bqrs_path, entities=entities) + case 'csv': + result = server._bqrs_to_csv(bqrs_path, entities=entities) + case 'sarif': + result = server._bqrs_to_sarif(bqrs_path, server._query_info(query_path)) + case _: + raise ValueError("Unsupported output format {fmt}") + except BrokenPipeError as e: + raise RuntimeError("Broken Pipe to query server") from e return result diff --git a/mcp_servers/codeql/mcp_server.py b/mcp_servers/codeql/mcp_server.py index b97b706..95e33d3 100644 --- a/mcp_servers/codeql/mcp_server.py +++ b/mcp_servers/codeql/mcp_server.py @@ -7,7 +7,8 @@ ) from client import run_query, file_from_uri, list_src_files, _debug_log, search_in_src_archive from pydantic import Field -from mcp.server.fastmcp import FastMCP, Context +#from mcp.server.fastmcp import FastMCP, Context +from fastmcp import FastMCP, Context # use FastMCP 2.0 from pathlib import Path import os import csv @@ -91,21 +92,25 @@ def _get_file_contents(db: str | Path, uri: str): db = Path(db) return file_from_uri(uri, db) - def _run_query(query_name: str, database_path: str, language: str, template_values: dict): """Run a CodeQL query and return the results""" + database_path = _resolve_db_path(database_path) try: query_path = _resolve_query_path(language, query_name) except RuntimeError: - return json.dumps([f"This query {query_name} is not supported for {language}"]) - csv = run_query(Path(__file__).parent.resolve() / - query_path, - database_path, - fmt='csv', - template_values=template_values, - log_stderr=True) - return _csv_to_json_obj(csv) + return json.dumps([f"The query {query_name} is not supported for language: {language}"]) + try: + csv = run_query(Path(__file__).parent.resolve() / + query_path, + database_path, + fmt='csv', + template_values=template_values, + log_stderr=True) + return _csv_to_json_obj(csv) + except Exception as e: + return json.dumps([f"The query {query_name} encountered an error: {e}"]) + @mcp.tool() @@ -203,4 +208,4 @@ def list_functions(database_path: str = Field(description="The CodeQL database p return _run_query('list_functions', database_path, language, {}) if __name__ == "__main__": - mcp.run() + mcp.run(show_banner=False, transport="http", host="127.0.0.1", port=9999) diff --git a/mcp_servers/echo/echo.py b/mcp_servers/echo/echo.py index c35aacd..e0bb5a6 100644 --- a/mcp_servers/echo/echo.py +++ b/mcp_servers/echo/echo.py @@ -5,7 +5,8 @@ filename='logs/mcp_echo.log', filemode='a' ) -from mcp.server.fastmcp import FastMCP +#from mcp.server.fastmcp import FastMCP +from fastmcp import FastMCP # move to FastMCP 2.0 mcp = FastMCP("Echo") @@ -30,4 +31,4 @@ def echo_prompt(message: str) -> str: return f"Please process this message: {message}" if __name__ == "__main__": - mcp.run() + mcp.run(show_banner=False) diff --git a/mcp_servers/logbook/logbook.py b/mcp_servers/logbook/logbook.py index 8369b4a..1f626e0 100644 --- a/mcp_servers/logbook/logbook.py +++ b/mcp_servers/logbook/logbook.py @@ -5,7 +5,8 @@ filename='logs/mcp_logbook.log', filemode='a' ) -from mcp.server.fastmcp import FastMCP +#from mcp.server.fastmcp import FastMCP +from fastmcp import FastMMCP # move to FastMCP 2.0 import json from pathlib import Path import os @@ -89,4 +90,4 @@ def _logbook_erase(key) -> str: if __name__ == "__main__": - mcp.run() + mcp.run(show_banner=False) diff --git a/mcp_servers/memcache/memcache.py b/mcp_servers/memcache/memcache.py index 02b7469..1c8eac8 100644 --- a/mcp_servers/memcache/memcache.py +++ b/mcp_servers/memcache/memcache.py @@ -5,7 +5,8 @@ filename='logs/mcp_memcache.log', filemode='a' ) -from mcp.server.fastmcp import FastMCP +#from mcp.server.fastmcp import FastMCP +from fastmcp import FastMCP # move to FastMCP 2.0 import json from pathlib import Path import os @@ -65,4 +66,4 @@ def memcache_clear_cache(): return backend.clear_cache() if __name__ == "__main__": - mcp.run() + mcp.run(show_banner=False) diff --git a/mcp_utils.py b/mcp_utils.py index b64116c..990e93c 100644 --- a/mcp_utils.py +++ b/mcp_utils.py @@ -1,7 +1,15 @@ import logging import asyncio -from threading import Thread +from threading import Thread, Event import json +import subprocess +from typing import Optional, Callable +import shutil +import time +import os +import socket +import signal +from urllib.parse import urlparse from mcp.types import CallToolResult, TextContent from agents.mcp import MCPServerStdio @@ -10,6 +18,124 @@ DEFAULT_MCP_CLIENT_SESSION_TIMEOUT = 120 +# A process management class for running in-process MCP streamable servers +class StreamableMCPThread(Thread): + def __init__( + self, + cmd, + url: str = '', + on_output: Optional[Callable[[str], None]] = None, + on_error: Optional[Callable[[str], None]] = None, + poll_interval: float = 0.5, + env: Optional[dict[str, str]] = None + ): + super().__init__(daemon=True) + self.url = url + self.cmd = cmd + self.on_output = on_output + self.on_error = on_error + self.poll_interval = poll_interval + self.env = os.environ.copy() # XXX: risk of leaking env secrets + self.env.update(env) + self._stop_event = Event() + self.process = None + self.exit_code = None + self.exception: Optional[BaseException] = None + + async def async_wait_for_connection(self, timeout=30.0, poll_interval=0.5): + parsed = urlparse(self.url) + host = parsed.hostname + port = parsed.port + if host is None or port is None: + raise ValueError(f"URL must include a host and port: {self.url}") + deadline = asyncio.get_event_loop().time() + timeout + while True: + try: + reader, writer = await asyncio.open_connection(host, port) + writer.close() + await writer.wait_closed() + return # Success + except (OSError, ConnectionRefusedError): + if asyncio.get_event_loop().time() > deadline: + raise TimeoutError(f"Could not connect to {host}:{port} after {timeout} seconds") + await asyncio.sleep(poll_interval) + + def wait_for_connection(self, timeout=30.0, poll_interval=0.5): + parsed = urlparse(self.url) + host = parsed.hostname + port = parsed.port + if host is None or port is None: + raise ValueError(f"URL must include a host and port: {url}") + deadline = time.time() + timeout + while True: + try: + with socket.create_connection((host, port), timeout=2): + return # Success + except OSError: + if time.time() > deadline: + raise TimeoutError(f"Could not connect to {host}:{port} after {timeout} seconds") + time.sleep(poll_interval) + + def run(self): + try: + self.process = subprocess.Popen( + self.cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + universal_newlines=True, + env=self.env + ) + + stdout_thread = Thread(target=self._read_stream, args=(self.process.stdout, self.on_output)) + stderr_thread = Thread(target=self._read_stream, args=(self.process.stderr, self.on_error)) + stdout_thread.start() + stderr_thread.start() + + while self.process.poll() is None and not self._stop_event.is_set(): + time.sleep(self.poll_interval) + + # Process ended or stop requested + if self.process.poll() is None: + self.process.terminate() + self.process.wait() + self.exit_code = self.process.returncode + + stdout_thread.join() + stderr_thread.join() + + # sigterm (-15) is expected + if self.exit_code not in [0, -15]: + self.exception = subprocess.CalledProcessError( + self.exit_code, self.cmd + ) + + except BaseException as e: + self.exception = e + + def _read_stream(self, stream, callback): + if stream is None or callback is None: + return + for line in iter(stream.readline, ''): + callback(line.rstrip('\n')) + stream.close() + + def stop(self): + self._stop_event.set() + if self.process and self.process.poll() is None: + self.process.terminate() + + def is_running(self): + return self.process and self.process.poll() is None + + def join_and_raise(self, timeout: Optional[float] = None): + self.join(timeout) + if self.is_alive(): + raise RuntimeError("Process thread did not exit within timeout.") + if self.exception is not None: + raise self.exception + # used for debugging asyncio event loop issues in mcp stdio servers # lifts the asyncio event loop in use to a dedicated threaded loop class AsyncDebugMCPServerStdio(MCPServerStdio): @@ -179,6 +305,7 @@ def mcp_client_params(available_toolboxes: dict, requested_toolboxes: list): server_params['command'] = available_toolboxes[tb]['server_params'].get('command') server_params['args'] = args server_params['env'] = env + # XXX: SSE is deprecated in the MCP spec, but keep it around for now case 'sse': headers = available_toolboxes[tb]['server_params'].get('headers') # support {{ env SOMETHING }} for header values as well for e.g. tokens @@ -190,6 +317,10 @@ def mcp_client_params(available_toolboxes: dict, requested_toolboxes: list): server_params['url'] = available_toolboxes[tb]['server_params'].get('url') server_params['headers'] = headers server_params['timeout'] = timeout + # for more involved local MCP servers, jsonrpc over stdio seems less than reliable + # as an alternative you can configure local toolboxes to use the streamable transport + # but still be started/stopped on demand similar to stdio mcp servers + # all it requires is a streamable config that also has cmd/args/env set case 'streamable': headers = available_toolboxes[tb]['server_params'].get('headers') # support {{ env SOMETHING }} for header values as well for e.g. tokens @@ -201,6 +332,33 @@ def mcp_client_params(available_toolboxes: dict, requested_toolboxes: list): server_params['url'] = available_toolboxes[tb]['server_params'].get('url') server_params['headers'] = headers server_params['timeout'] = timeout + # if command/args/env is set, we also need to start this MCP server ourselves + # this way we can use the streamable transport for MCP servers that get fussy + # over stdio jsonrpc polling + env = available_toolboxes[tb]['server_params'].get('env') + args = available_toolboxes[tb]['server_params'].get('args') + cmd = available_toolboxes[tb]['server_params'].get('command') + exe = shutil.which(cmd) + if cmd is not None: + logging.debug(f"Initializing streamable toolbox: {tb}\nargs:\n{args }\nenv:\n{env}\n") + if exe is None: + raise FileNotFoundError(f"Could not resolve path to {cmd}") + start_cmd = [exe] + if args is not None and isinstance(args, list): + for i, v in enumerate(args): + args[i] = swap_env(v) + start_cmd += args + server_params['command'] = start_cmd + if env is not None and isinstance(env, dict): + for k, v in dict(env).items(): + try: + env[k] = swap_env(v) + except LookupError as e: + logging.critical(e) + logging.info("Assuming toolbox has default configuration available") + del env[k] + pass + server_params['env'] = env case _: raise ValueError(f"Unsupported MCP transport {kind}") confirms = available_toolboxes[tb].get('confirm', []) diff --git a/requirements.txt b/requirements.txt index 2e8f822..d7adc97 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,12 +2,22 @@ aiofiles==24.1.0 annotated-types==0.7.0 anyio==4.9.0 attrs==25.3.0 +Authlib==1.6.3 certifi==2025.6.15 +cffi==2.0.0 charset-normalizer==3.4.2 click==8.2.1 colorama==0.4.6 +cryptography==45.0.7 +cyclopts==3.24.0 distro==1.9.0 +dnspython==2.8.0 docstring-to-markdown==0.17 +docstring_parser==0.17.0 +docutils==0.22 +email-validator==2.3.0 +exceptiongroup==1.3.0 +fastmcp==2.12.2 griffe==1.7.3 h11==0.16.0 httpcore==1.0.9 @@ -15,21 +25,34 @@ httpx==0.28.1 httpx-sse==0.4.1 idna==3.10 importlib_metadata==8.7.0 +isodate==0.7.2 jedi==0.19.2 jiter==0.10.0 jsonschema==4.24.0 +jsonschema-path==0.3.4 jsonschema-specifications==2025.4.1 +lazy-object-proxy==1.12.0 markdown-it-py==3.0.0 +MarkupSafe==3.0.2 mcp==1.13.1 mdurl==0.1.2 +more-itertools==10.8.0 openai==1.107.0 openai-agents==0.2.11 +openapi-core==0.19.5 +openapi-pydantic==0.5.1 +openapi-schema-validator==0.6.3 +openapi-spec-validator==0.7.2 +parse==1.20.2 parso==0.8.4 +pathable==0.4.4 pluggy==1.6.0 +pycparser==2.23 pydantic==2.11.7 pydantic-settings==2.10.1 pydantic_core==2.33.2 Pygments==2.19.2 +pyperclip==1.9.0 python-dotenv==1.1.1 python-lsp-jsonrpc==1.1.2 python-lsp-server==1.12.2 @@ -37,9 +60,12 @@ python-multipart==0.0.20 PyYAML==6.0.2 referencing==0.36.2 requests==2.32.4 +rfc3339-validator==0.1.4 rich==14.0.0 +rich-rst==1.3.1 rpds-py==0.26.0 shellingham==1.5.4 +six==1.17.0 sniffio==1.3.1 SQLAlchemy==2.0.41 sse-starlette==2.4.1 @@ -52,4 +78,5 @@ typing_extensions==4.14.1 ujson==5.10.0 urllib3==2.5.0 uvicorn==0.35.0 +Werkzeug==3.1.1 zipp==3.23.0 diff --git a/toolboxes/codeql.yaml b/toolboxes/codeql.yaml index 1eedea4..d539a26 100644 --- a/toolboxes/codeql.yaml +++ b/toolboxes/codeql.yaml @@ -1,6 +1,7 @@ server_params: - kind: stdio - reconnecting: true + kind: streamable + url: 'http://localhost:9999/mcp' + # if you set a command/args/env we will also start the server on demand command: python args: ["mcp_servers/codeql/mcp_server.py"] env: From b262b920ba3257e77ee5d04fc4d2cf9d56ae3adc Mon Sep 17 00:00:00 2001 From: Bas Alberts Date: Mon, 15 Sep 2025 18:40:16 -0400 Subject: [PATCH 2/5] Fix bug in cmd resolve --- mcp_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mcp_utils.py b/mcp_utils.py index 990e93c..03ac8fb 100644 --- a/mcp_utils.py +++ b/mcp_utils.py @@ -338,9 +338,9 @@ def mcp_client_params(available_toolboxes: dict, requested_toolboxes: list): env = available_toolboxes[tb]['server_params'].get('env') args = available_toolboxes[tb]['server_params'].get('args') cmd = available_toolboxes[tb]['server_params'].get('command') - exe = shutil.which(cmd) if cmd is not None: logging.debug(f"Initializing streamable toolbox: {tb}\nargs:\n{args }\nenv:\n{env}\n") + exe = shutil.which(cmd) if exe is None: raise FileNotFoundError(f"Could not resolve path to {cmd}") start_cmd = [exe] From da8a2b274f7fb54c5350bdabf945304ec93cbf89 Mon Sep 17 00:00:00 2001 From: Bas Alberts Date: Tue, 16 Sep 2025 13:22:59 -0400 Subject: [PATCH 3/5] Update docs and finalize local streamable MCP server support --- README.md | 103 ++++++++++++++++--------------- main.py | 7 +-- mcp_servers/codeql/client.py | 13 ++-- mcp_servers/codeql/mcp_server.py | 5 +- mcp_utils.py | 4 +- 5 files changed, 68 insertions(+), 64 deletions(-) diff --git a/README.md b/README.md index 1559773..6d616e2 100644 --- a/README.md +++ b/README.md @@ -14,17 +14,43 @@ It's primary value proposition is as a CLI tool that allows users to quickly def Agents are defined through [personalities](personalities/), that receive a [task](taskflows/) to complete given a set of [tools](toolboxes/). -Agents can cooperate to complete sequences of tasks through so-called [Taskflows](taskflows/GRAMMAR.md). +Agents can cooperate to complete sequences of tasks through so-called [taskflows](taskflows/GRAMMAR.md). + +You can find a detailed overview of the taskflow grammar [here](https://github.com/GitHubSecurityLab/seclab-taskflow-agent/blob/main/taskflows/GRAMMAR.md) and example taskflows [here](https://github.com/GitHubSecurityLab/seclab-taskflow-agent/tree/main/taskflows/examples). + +## Use Cases and Examples + +The Seclab Taskflow Agent framework was primarily designed to fit the iterative feedback loop driven work involved in Agentic security research workflows and vulnerability triage tasks. + +Its design philosophy is centered around the belief that a prompt level focus of capturing vulnerability patterns will greatly improve and scale security research results as frontier model capabilities evolve over time. + +While the maintainer himself primarily uses this framework as a code auditing tool it also serves as a more generic swiss army knife for exploring Agentic workflows. For example, the GitHub Security Lab also uses this framework for automated code scanning alert triage. + +The framework includes a [CodeQL](https://codeql.github.com/) MCP server that can be used for Agentic code review, see the [CVE-2023-2283](https://github.com/GitHubSecurityLab/seclab-taskflow-agent/blob/main/taskflows/CVE-2023-2283/CVE-2023-2283.yaml) for an example of how to have an Agent review C code using a CodeQL database. + +Instead of generating CodeQL queries itself, the CodeQL MCP Server is used to provide CodeQL-query based MCP tools that allow an Agent to navigate and explore code. It leverages templated CodeQL queries to provide targeted context for model driven code analysis. ## Requirements Python >= 3.9 or Docker -# Usage +## Configuration + +Provide a GitHub token for an account that is entitled to use GitHub Copilot via the `COPILOT_TOKEN` environment variable. Further configuration is use case dependent, i.e. pending which MCP servers you'd like to use in your taskflows. + +You can set persisting environment variables via an `.env` file in the project root. -Provide a Copilot entitled GitHub PAT via the `COPILOT_TOKEN` environment variable. +Example: -## Source +```sh +# Tokens +COPILOT_TOKEN= +# MCP configs +GITHUB_PERSONAL_ACCESS_TOKEN= +CODEQL_DBS_BASE_PATH="/app/my_data/" +``` + +## Deploying from Source First install the required dependencies: @@ -48,9 +74,11 @@ Example: deploying a Taskflow: python main.py -t example ``` -## Docker +## Deploying from Docker -Alternatively you can deploy the Agent via its Docker image using `docker/run.sh`. +You can deploy the Taskflow Agent via its Docker image using `docker/run.sh`. + +WARNING: the Agent Docker image is _NOT_ intended as a security boundary but strictly a deployment convenience. The image entrypoint is `main.py` and thus it operates the same as invoking the Agent from source directly. @@ -58,30 +86,30 @@ You can find the Docker image for the Seclab Taskflow Agent [here](https://githu Note that this image is based on a public release of the Taskflow Agent, and you will have to mount any custom taskflows, personalities, or prompts into the image for them to be available to the Agent. -See [docker/run.sh](docker/run.sh) for configuration details. +Optional image mount points to supply custom data are configured via the environment: -Example: deploying a Taskflow: +- Custom data via `MY_DATA`, mounts to `/app/my_data` +- Custom personalities via `MY_PERSONALITIES`, mounts to `/app/personalities/my_personalities` +- Custom taskflows via `MY_TASKFLOWS`, mounts to `/app/taskflows/my_taskflows` +- Custom prompts via `MY_PROMPTS`, mounts to `/app/prompts/my_prompts` +- Custom toolboxes via `MY_TOOLBOXES`, mounts to `/app/toolboxes/my_toolboxes` + +See [docker/run.sh](docker/run.sh) for forther details details. + +Example: deploying a Taskflow (example.yaml): ```sh docker/run.sh -t example ``` -Example: deploying a custom taskflow: +Example: deploying a custom taskflow (custom_taskflow.yaml_: ```sh MY_TASKFLOWS=~/my_taskflows docker/run.sh -t custom_taskflow ``` -Available image mount points are: - -- Custom data via `MY_DATA` environment variable -- Custom personalities via `MY_PERSONALITIES` environment variable -- Custom taskflows via `MY_TASKFLOWS` environment variable -- Custom prompts via `MY_PROMPTS` environment variable -- Custom toolboxes via `MY_TOOLBOXES` environment variable - For more advanced scenarios like e.g. making custom MCP server code available, you can alter the run script to mount your custom code into the image and configure your toolboxes to use said code accordingly. -Example: custom MCP server deployment via Docker image: +Example: a custom MCP server deployment via Docker image: ```sh export MY_MCP_SERVERS=./mcp_servers @@ -109,7 +137,7 @@ docker run \ Our default run script makes the Docker socket available to the image, which contains the Docker cli, so 3rd party Docker based stdio MCP servers also function as normal. -Example: a toolbox configuration for the official GitHub MCP Server: +Example: a toolbox configuration using the official GitHub MCP Server via Docker: ```yaml server_params: @@ -120,23 +148,7 @@ server_params: GITHUB_PERSONAL_ACCESS_TOKEN: "{{ env GITHUB_PERSONAL_ACCESS_TOKEN }}" ``` -## Framework Configuration - -Set environment variables via an `.env` file in the project root. - -Example: a persistent Agent configuration with various MCP server environment variables set: - -```sh -# Tokens -COPILOT_TOKEN=... -# Docker config, MY_DATA is mounted to /app/my_data -MY_DATA="/home/user/my_data" -# MCP configs -GITHUB_PERSONAL_ACCESS_TOKEN=... -CODEQL_DBS_BASE_PATH="/app/my_data/" -``` - -# Personalities +## Personalities Core characteristics for a single Agent. Configured through YAML files in `personalities/`. @@ -157,7 +169,7 @@ toolboxes: - echo ``` -# Toolboxes +## Toolboxes MCP servers that provide tools. Configured through YAML files in `toolboxes/`. @@ -174,18 +186,7 @@ server_params: SOME: value ``` -Example sse config: - -```yaml -server_params: - kind: sse - # make sure you .env config the echo server, see echo_sse.py for example - url: http://127.0.0.1:9000/echo - headers: - SomeHeader: "{{ env USER }}" -``` - -# Taskflows +## Taskflows A sequence of interdependent tasks performed by a set of Agents. Configured through a YAML based [grammar](taskflows/GRAMMAR.md) in [taskflows/](taskflows/). @@ -263,6 +264,6 @@ This project is licensed under the terms of the MIT open source license. Please [SUPPORT](./SUPPORT.md) -## Acknowledgement +## Acknowledgements -Security Lab team members @m-y-mo and @p- for contributing heavily to the testing and development of this framework, as well as the rest of the Security Lab team for helpful discussions and use cases. +Security Lab team members [Man Yue Mo](https://github.com/m-y-mo) and [Peter Stockli](https://github.com/p-) for contributing heavily to the testing and development of this framework, as well as the rest of the Security Lab team for helpful discussions and feedback. diff --git a/main.py b/main.py index d9373a8..06e8469 100644 --- a/main.py +++ b/main.py @@ -161,11 +161,11 @@ async def deploy_task_agents(agents: dict, def _print_out(line): msg = f"Streamable MCP Server stdout: {line}" logging.info(msg) - print(msg) + #print(msg) def _print_err(line): msg = f"Streamable MCP Server stderr: {line}" logging.info(msg) - print(msg) + #print(msg) server_proc = StreamableMCPThread(params['command'], url=params['url'], env=params['env'], @@ -262,9 +262,6 @@ async def mcp_session_task( server_prompts=server_prompts, important_guidelines=important_guidelines) ), - # XXX: should handoffs have handoffs? - # XXX: this would be a recursive chicken/egg problem :P - # XXX: are initial handoff functions still visible to handoff agents in the run? handoffs=[], exclude_from_context=exclude_from_context, mcp_servers=[s[0] for s in mcp_servers], diff --git a/mcp_servers/codeql/client.py b/mcp_servers/codeql/client.py index 74afbb0..3d3c059 100644 --- a/mcp_servers/codeql/client.py +++ b/mcp_servers/codeql/client.py @@ -73,13 +73,13 @@ def _server_start(self): server_cmd += self.server_options self.stderr_log = open(self.stderr_log, 'a') p = subprocess.Popen(self.codeql_cli + server_cmd, + text=True, + bufsize=1, + universal_newlines=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=self.stderr_log) - # XXX: should we give codeql query server some time to finish initializing ? - # XXX: because the query server process is silent we can not just poll for some standard banner - # set some default callbacks for common notifications def _handle_ql_progressUpdated(params): print(f">> Progress: {params.get('step')}/{params.get('maxStep')} status: {params.get('message')}") @@ -583,7 +583,7 @@ def run_query(query_path: str | Path, database: Path, progress_callback=None, template_values=None, # keep the query server alive if desired - keep_alive=False, + keep_alive=True, log_stderr=False): result = '' query_path = Path(query_path) @@ -602,6 +602,7 @@ def run_query(query_path: str | Path, database: Path, bqrs_path = base_path / Path("query.bqrs") if search_paths: server.search_paths += search_paths + server._server_run_query_from_path(bqrs_path, query_path, quick_eval_pos=target_pos, template_values=template_values) @@ -619,6 +620,6 @@ def run_query(query_path: str | Path, database: Path, result = server._bqrs_to_sarif(bqrs_path, server._query_info(query_path)) case _: raise ValueError("Unsupported output format {fmt}") - except BrokenPipeError as e: - raise RuntimeError("Broken Pipe to query server") from e + except Exception as e: + raise RuntimeError(f"Error in run_query: {e}") from e return result diff --git a/mcp_servers/codeql/mcp_server.py b/mcp_servers/codeql/mcp_server.py index 95e33d3..86243cf 100644 --- a/mcp_servers/codeql/mcp_server.py +++ b/mcp_servers/codeql/mcp_server.py @@ -95,7 +95,10 @@ def _get_file_contents(db: str | Path, uri: str): def _run_query(query_name: str, database_path: str, language: str, template_values: dict): """Run a CodeQL query and return the results""" - database_path = _resolve_db_path(database_path) + try: + database_path = _resolve_db_path(database_path) + except RuntimeError: + return json.dumps([f"The database pat for {database_path} could not be resolved"]) try: query_path = _resolve_query_path(language, query_name) except RuntimeError: diff --git a/mcp_utils.py b/mcp_utils.py index 03ac8fb..a8b89d0 100644 --- a/mcp_utils.py +++ b/mcp_utils.py @@ -20,6 +20,7 @@ # A process management class for running in-process MCP streamable servers class StreamableMCPThread(Thread): + """Process management for local streamable MCP servers""" def __init__( self, cmd, @@ -35,7 +36,7 @@ def __init__( self.on_output = on_output self.on_error = on_error self.poll_interval = poll_interval - self.env = os.environ.copy() # XXX: risk of leaking env secrets + self.env = os.environ.copy() # XXX: potential for environment leak to MCP self.env.update(env) self._stop_event = Event() self.process = None @@ -216,6 +217,7 @@ async def call_tool(self, *args, **kwargs): return result class MCPNamespaceWrap: + """An MCP client object wrapper that provides us with namespace control""" def __init__(self, confirms, obj): self.confirms = confirms self._obj = obj From bfe5dedc16c4760d90ba1a577acc615028c56a7f Mon Sep 17 00:00:00 2001 From: Bas Alberts Date: Tue, 16 Sep 2025 13:27:24 -0400 Subject: [PATCH 4/5] Fix type check for server proc on mcp teardown --- main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.py b/main.py index 06e8469..6392046 100644 --- a/main.py +++ b/main.py @@ -208,7 +208,7 @@ async def mcp_session_task( logging.debug(f"Starting cleanup for mcp server: {server._name}") await server.cleanup() logging.debug(f"Cleaned up mcp server: {server._name}") - if server_proc: + if server_proc is not None: server_proc.stop() try: await asyncio.to_thread(server_proc.join_and_raise) From 27b25083ce01d7a6688147ccc9547c1e5c22eaa6 Mon Sep 17 00:00:00 2001 From: Bas Alberts Date: Tue, 16 Sep 2025 13:30:25 -0400 Subject: [PATCH 5/5] Address review feedback --- README.md | 4 ++-- mcp_servers/codeql/mcp_server.py | 2 +- mcp_servers/logbook/logbook.py | 2 +- mcp_utils.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 6d616e2..da1f50d 100644 --- a/README.md +++ b/README.md @@ -94,14 +94,14 @@ Optional image mount points to supply custom data are configured via the environ - Custom prompts via `MY_PROMPTS`, mounts to `/app/prompts/my_prompts` - Custom toolboxes via `MY_TOOLBOXES`, mounts to `/app/toolboxes/my_toolboxes` -See [docker/run.sh](docker/run.sh) for forther details details. +See [docker/run.sh](docker/run.sh) for further details. Example: deploying a Taskflow (example.yaml): ```sh docker/run.sh -t example ``` -Example: deploying a custom taskflow (custom_taskflow.yaml_: +Example: deploying a custom taskflow (custom_taskflow.yaml): ```sh MY_TASKFLOWS=~/my_taskflows docker/run.sh -t custom_taskflow diff --git a/mcp_servers/codeql/mcp_server.py b/mcp_servers/codeql/mcp_server.py index 86243cf..0959eaf 100644 --- a/mcp_servers/codeql/mcp_server.py +++ b/mcp_servers/codeql/mcp_server.py @@ -98,7 +98,7 @@ def _run_query(query_name: str, database_path: str, language: str, template_valu try: database_path = _resolve_db_path(database_path) except RuntimeError: - return json.dumps([f"The database pat for {database_path} could not be resolved"]) + return json.dumps([f"The database path for {database_path} could not be resolved"]) try: query_path = _resolve_query_path(language, query_name) except RuntimeError: diff --git a/mcp_servers/logbook/logbook.py b/mcp_servers/logbook/logbook.py index 1f626e0..199051e 100644 --- a/mcp_servers/logbook/logbook.py +++ b/mcp_servers/logbook/logbook.py @@ -6,7 +6,7 @@ filemode='a' ) #from mcp.server.fastmcp import FastMCP -from fastmcp import FastMMCP # move to FastMCP 2.0 +from fastmcp import FastMCP # move to FastMCP 2.0 import json from pathlib import Path import os diff --git a/mcp_utils.py b/mcp_utils.py index a8b89d0..3a7507c 100644 --- a/mcp_utils.py +++ b/mcp_utils.py @@ -66,7 +66,7 @@ def wait_for_connection(self, timeout=30.0, poll_interval=0.5): host = parsed.hostname port = parsed.port if host is None or port is None: - raise ValueError(f"URL must include a host and port: {url}") + raise ValueError(f"URL must include a host and port: {self.url}") deadline = time.time() + timeout while True: try: