From 8de5788dc6ec05ce9efa0a15bd95fe1dcb0752ff Mon Sep 17 00:00:00 2001 From: Elias Bachaalany Date: Sun, 18 Jan 2026 14:58:40 -0800 Subject: [PATCH 1/4] feat: add tool timeouts, context transport, ping middleware --- CMakeLists.txt | 4 + include/fastmcpp/app.hpp | 4 +- include/fastmcpp/exceptions.hpp | 5 + include/fastmcpp/proxy.hpp | 3 +- include/fastmcpp/server/context.hpp | 36 +++- .../fastmcpp/server/middleware_pipeline.hpp | 170 +++++++++++++++--- include/fastmcpp/server/session.hpp | 12 ++ include/fastmcpp/tools/manager.hpp | 5 +- include/fastmcpp/tools/tool.hpp | 71 +++++++- include/fastmcpp/tools/tool_transform.hpp | 7 +- src/app.cpp | 10 +- src/mcp/handler.cpp | 23 ++- src/proxy.cpp | 5 +- src/server/context.cpp | 8 +- tests/server/test_context_full.cpp | 25 +++ tests/server/test_middleware_pipeline.cpp | 40 +++++ tests/tools/test_tool_timeout.cpp | 121 +++++++++++++ 17 files changed, 501 insertions(+), 48 deletions(-) create mode 100644 tests/tools/test_tool_timeout.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 82b76b9..ca15b01 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -226,6 +226,10 @@ if(FASTMCPP_BUILD_TESTS) target_link_libraries(fastmcpp_tools_manager PRIVATE fastmcpp_core) add_test(NAME fastmcpp_tools_manager COMMAND fastmcpp_tools_manager) + add_executable(fastmcpp_tools_timeout tests/tools/test_tool_timeout.cpp) + target_link_libraries(fastmcpp_tools_timeout PRIVATE fastmcpp_core) + add_test(NAME fastmcpp_tools_timeout COMMAND fastmcpp_tools_timeout) + add_executable(fastmcpp_integration tests/integration.cpp) target_link_libraries(fastmcpp_integration PRIVATE fastmcpp_core) add_test(NAME fastmcpp_integration COMMAND fastmcpp_integration) diff --git a/include/fastmcpp/app.hpp b/include/fastmcpp/app.hpp index 0a2dbf5..efb6472 100644 --- a/include/fastmcpp/app.hpp +++ b/include/fastmcpp/app.hpp @@ -7,6 +7,7 @@ #include "fastmcpp/server/server.hpp" #include "fastmcpp/tools/manager.hpp" +#include #include #include #include @@ -65,6 +66,7 @@ class FastMCP std::vector exclude_args; TaskSupport task_support{TaskSupport::Forbidden}; Json output_schema{Json::object()}; + std::optional timeout; }; struct PromptOptions @@ -250,7 +252,7 @@ class FastMCP // ========================================================================= /// Invoke a tool by name (handles prefixed routing) - Json invoke_tool(const std::string& name, const Json& args) const; + Json invoke_tool(const std::string& name, const Json& args, bool enforce_timeout = true) const; /// Read a resource by URI (handles prefixed routing) resources::ResourceContent read_resource(const std::string& uri, diff --git a/include/fastmcpp/exceptions.hpp b/include/fastmcpp/exceptions.hpp index d27aec0..de5918a 100644 --- a/include/fastmcpp/exceptions.hpp +++ b/include/fastmcpp/exceptions.hpp @@ -20,6 +20,11 @@ struct ValidationError : public Error using Error::Error; }; +struct ToolTimeoutError : public Error +{ + using Error::Error; +}; + struct TransportError : public Error { using Error::Error; diff --git a/include/fastmcpp/proxy.hpp b/include/fastmcpp/proxy.hpp index e971d88..9dc45fc 100644 --- a/include/fastmcpp/proxy.hpp +++ b/include/fastmcpp/proxy.hpp @@ -107,7 +107,8 @@ class ProxyApp /// Invoke a tool by name /// Tries local tools first, falls back to remote - client::CallToolResult invoke_tool(const std::string& name, const Json& args) const; + client::CallToolResult invoke_tool(const std::string& name, const Json& args, + bool enforce_timeout = true) const; /// Read a resource by URI /// Tries local resources first, falls back to remote diff --git a/include/fastmcpp/server/context.hpp b/include/fastmcpp/server/context.hpp index 0d1686d..585a17b 100644 --- a/include/fastmcpp/server/context.hpp +++ b/include/fastmcpp/server/context.hpp @@ -36,6 +36,13 @@ enum class LogLevel Error }; +enum class TransportType +{ + Stdio, + Sse, + StreamableHttp +}; + // ============================================================================ // Sampling types (for Context.sample()) // ============================================================================ @@ -146,6 +153,21 @@ inline std::string to_string(LogLevel level) } } +inline std::string to_string(TransportType transport) +{ + switch (transport) + { + case TransportType::Stdio: + return "stdio"; + case TransportType::Sse: + return "sse"; + case TransportType::StreamableHttp: + return "streamable-http"; + default: + return "unknown"; + } +} + using LogCallback = std::function; using ProgressCallback = std::function; @@ -158,7 +180,8 @@ class Context Context(const resources::ResourceManager& rm, const prompts::PromptManager& pm, std::optional request_meta, std::optional request_id = std::nullopt, - std::optional session_id = std::nullopt); + std::optional session_id = std::nullopt, + std::optional transport = std::nullopt); std::vector list_resources() const; std::vector list_prompts() const; @@ -177,6 +200,16 @@ class Context { return session_id_; } + std::optional transport() const + { + if (!transport_.has_value()) + return std::nullopt; + return to_string(*transport_); + } + std::optional transport_type() const + { + return transport_; + } std::optional client_id() const { @@ -398,6 +431,7 @@ class Context std::optional request_meta_; std::optional request_id_; std::optional session_id_; + std::optional transport_; mutable std::unordered_map state_; LogCallback log_callback_; ProgressCallback progress_callback_; diff --git a/include/fastmcpp/server/middleware_pipeline.hpp b/include/fastmcpp/server/middleware_pipeline.hpp index a022989..f17e0ea 100644 --- a/include/fastmcpp/server/middleware_pipeline.hpp +++ b/include/fastmcpp/server/middleware_pipeline.hpp @@ -7,17 +7,24 @@ /// - Middleware base class with virtual hooks /// - Built-in implementations: Logging, Timing, Caching, RateLimiting, ErrorHandling -#include "fastmcpp/types.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "fastmcpp/server/session.hpp" +#include "fastmcpp/types.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace fastmcpp::server { @@ -32,11 +39,12 @@ struct MiddlewareContext std::string method; ///< MCP method name (e.g., "tools/call") std::string source{"client"}; ///< Origin: "client" or "server" std::string type{"request"}; ///< Message type: "request" or "notification" - std::chrono::steady_clock::time_point timestamp; ///< Request timestamp - std::optional request_id; ///< Request ID if available - std::optional tool_name; ///< Tool name for tools/call - std::optional resource_uri; ///< Resource URI for resources/read - std::optional prompt_name; ///< Prompt name for prompts/get + std::chrono::steady_clock::time_point timestamp; ///< Request timestamp + std::optional request_id; ///< Request ID if available + std::shared_ptr session; ///< ServerSession for this request (optional) + std::optional tool_name; ///< Tool name for tools/call + std::optional resource_uri; ///< Resource URI for resources/read + std::optional prompt_name; ///< Prompt name for prompts/get /// Create a copy with modified fields MiddlewareContext copy() const @@ -502,8 +510,8 @@ class RateLimitingMiddleware : public Middleware }; /// Error handling middleware - catches exceptions and converts to MCP errors -class ErrorHandlingMiddleware : public Middleware -{ +class ErrorHandlingMiddleware : public Middleware +{ public: using ErrorCallback = std::function; @@ -570,7 +578,125 @@ class ErrorHandlingMiddleware : public Middleware ErrorCallback callback_; bool include_trace_; mutable std::mutex mutex_; - std::unordered_map error_counts_; -}; - -} // namespace fastmcpp::server + std::unordered_map error_counts_; +}; + +/// Ping middleware - sends periodic pings to keep client connections alive +class PingMiddleware : public Middleware +{ + public: + explicit PingMiddleware(std::chrono::milliseconds interval = std::chrono::milliseconds(30000)) + : interval_(interval) + { + if (interval_.count() <= 0) + throw std::invalid_argument("interval must be positive"); + } + + explicit PingMiddleware(int interval_ms) + : PingMiddleware(std::chrono::milliseconds(interval_ms)) + { + } + + ~PingMiddleware() override + { + stop(); + } + + Json operator()(const MiddlewareContext& ctx, CallNext call_next) override + { + if (ctx.session) + ensure_session(ctx.session); + return call_next(ctx); + } + + private: + void ensure_session(const std::shared_ptr& session) + { + const std::string key = session_key(session); + if (key.empty()) + return; + + bool should_start = false; + { + std::lock_guard lock(mutex_); + if (active_sessions_.insert(key).second) + should_start = true; + } + + if (!should_start) + return; + + std::weak_ptr weak_session = session; + std::thread worker([this, weak_session, key]() { ping_loop(weak_session, key); }); + { + std::lock_guard lock(mutex_); + threads_.push_back(std::move(worker)); + } + } + + void ping_loop(std::weak_ptr weak_session, const std::string& key) + { + while (true) + { + { + std::unique_lock lock(mutex_); + if (cv_.wait_for(lock, interval_, [this]() { return stop_.load(); })) + break; + } + + if (stop_.load()) + break; + + auto session = weak_session.lock(); + if (!session) + break; + + try + { + session->send_ping(interval_); + } + catch (const std::exception&) + { + break; + } + } + + std::lock_guard lock(mutex_); + active_sessions_.erase(key); + } + + void stop() + { + stop_.store(true); + cv_.notify_all(); + + std::vector threads; + { + std::lock_guard lock(mutex_); + threads.swap(threads_); + } + + for (auto& t : threads) + if (t.joinable()) + t.join(); + } + + static std::string session_key(const std::shared_ptr& session) + { + if (!session) + return {}; + auto key = session->session_id(); + if (!key.empty()) + return key; + return "session@" + std::to_string(reinterpret_cast(session.get())); + } + + std::chrono::milliseconds interval_; + std::mutex mutex_; + std::condition_variable cv_; + std::unordered_set active_sessions_; + std::vector threads_; + std::atomic stop_{false}; +}; + +} // namespace fastmcpp::server diff --git a/include/fastmcpp/server/session.hpp b/include/fastmcpp/server/session.hpp index 127976f..8061100 100644 --- a/include/fastmcpp/server/session.hpp +++ b/include/fastmcpp/server/session.hpp @@ -291,6 +291,18 @@ class ServerSession send_callback_(notification); } + /** + * Send a ping request to the client and wait for a response. + * + * @param timeout How long to wait for response + * @throws RequestTimeoutError if timeout exceeded + * @throws ClientError if client returns an error + */ + void send_ping(std::chrono::milliseconds timeout = DEFAULT_TIMEOUT) + { + (void)send_request("ping", Json::object(), timeout); + } + /** * Send a progress notification to the client. * diff --git a/include/fastmcpp/tools/manager.hpp b/include/fastmcpp/tools/manager.hpp index d6456cb..65c51ff 100644 --- a/include/fastmcpp/tools/manager.hpp +++ b/include/fastmcpp/tools/manager.hpp @@ -19,12 +19,13 @@ class ToolManager { return tools_.at(name); } - fastmcpp::Json invoke(const std::string& name, const fastmcpp::Json& input) const + fastmcpp::Json invoke(const std::string& name, const fastmcpp::Json& input, + bool enforce_timeout = true) const { auto it = tools_.find(name); if (it == tools_.end()) throw fastmcpp::NotFoundError("tool not found: " + name); - return it->second.invoke(input); + return it->second.invoke(input, enforce_timeout); } std::vector list_names() const diff --git a/include/fastmcpp/tools/tool.hpp b/include/fastmcpp/tools/tool.hpp index 2b8c9b7..89ede9b 100644 --- a/include/fastmcpp/tools/tool.hpp +++ b/include/fastmcpp/tools/tool.hpp @@ -1,9 +1,16 @@ #pragma once +#include "fastmcpp/exceptions.hpp" #include "fastmcpp/types.hpp" +#include +#include #include +#include +#include #include +#include #include +#include #include namespace fastmcpp::tools @@ -65,9 +72,45 @@ class Tool { return output_schema_; } - fastmcpp::Json invoke(const fastmcpp::Json& input) const + fastmcpp::Json invoke(const fastmcpp::Json& input, bool enforce_timeout = true) const { - return fn_(input); + if (!enforce_timeout || !timeout_.has_value() || timeout_->count() <= 0) + return fn_(input); + + std::promise promise; + auto future = promise.get_future(); + auto timeout = *timeout_; + + std::thread worker([promise = std::move(promise), input, fn = fn_]() mutable + { + try + { + promise.set_value(fn(input)); + } + catch (...) + { + try + { + promise.set_exception(std::current_exception()); + } + catch (...) + { + } + } + }); + + if (future.wait_for(timeout) == std::future_status::timeout) + { + if (worker.joinable()) + worker.detach(); + throw fastmcpp::ToolTimeoutError("Tool '" + name_ + + "' execution timed out after " + + format_timeout_seconds(timeout) + "s"); + } + + if (worker.joinable()) + worker.join(); + return future.get(); } fastmcpp::TaskSupport task_support() const @@ -96,8 +139,31 @@ class Tool task_support_ = support; return *this; } + Tool& set_timeout(std::optional timeout) + { + timeout_ = timeout; + return *this; + } + const std::optional& timeout() const + { + return timeout_; + } private: + static std::string format_timeout_seconds(std::chrono::milliseconds timeout) + { + std::ostringstream oss; + double seconds = std::chrono::duration(timeout).count(); + oss << std::fixed << std::setprecision(3) << seconds; + auto out = oss.str(); + auto trim_pos = out.find_last_not_of('0'); + if (trim_pos != std::string::npos && trim_pos + 1 < out.size()) + out.erase(trim_pos + 1); + if (!out.empty() && out.back() == '.') + out.pop_back(); + return out; + } + fastmcpp::Json prune_schema(const fastmcpp::Json& schema) const { // Work on a copy to avoid mutating shared $defs or properties @@ -140,6 +206,7 @@ class Tool Fn fn_; std::vector exclude_args_; fastmcpp::TaskSupport task_support_{fastmcpp::TaskSupport::Forbidden}; + std::optional timeout_; }; } // namespace fastmcpp::tools diff --git a/include/fastmcpp/tools/tool_transform.hpp b/include/fastmcpp/tools/tool_transform.hpp index 0a69f14..ef8ed7c 100644 --- a/include/fastmcpp/tools/tool_transform.hpp +++ b/include/fastmcpp/tools/tool_transform.hpp @@ -217,8 +217,10 @@ create_transformed_tool(const Tool& parent, std::optional new_name new_description.has_value() ? new_description : parent.description(); // Create new tool with transformed schema - return Tool(tool_name, transform_result.schema, parent.output_schema(), forwarding_fn, - parent.title(), tool_desc, parent.icons()); + Tool tool(tool_name, transform_result.schema, parent.output_schema(), forwarding_fn, + parent.title(), tool_desc, parent.icons()); + tool.set_timeout(parent.timeout()); + return tool; } /// Configuration for applying transformations via JSON/config @@ -310,6 +312,7 @@ class TransformedTool result.tool_ = Tool(tool_name, transform_result.schema, parent.output_schema(), forwarding_fn, parent.title(), tool_desc, parent.icons()); + result.tool_.set_timeout(parent.timeout()); return result; } diff --git a/src/app.cpp b/src/app.cpp index d7fc024..468ac7a 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -64,6 +64,8 @@ FastMCP& FastMCP::tool(std::string name, const Json& input_schema_or_simple, too std::move(options.icons), std::move(options.exclude_args), options.task_support}; + if (options.timeout) + t.set_timeout(*options.timeout); tools_.register_tool(t); return *this; @@ -546,12 +548,12 @@ std::vector> FastMCP::list_all_pr // Routing // ========================================================================= -Json FastMCP::invoke_tool(const std::string& name, const Json& args) const +Json FastMCP::invoke_tool(const std::string& name, const Json& args, bool enforce_timeout) const { // Try local tools first try { - return tools_.invoke(name, args); + return tools_.invoke(name, args, enforce_timeout); } catch (const NotFoundError&) { @@ -587,7 +589,7 @@ Json FastMCP::invoke_tool(const std::string& name, const Json& args) const try { - return mounted.app->invoke_tool(try_name, args); + return mounted.app->invoke_tool(try_name, args, enforce_timeout); } catch (const NotFoundError&) { @@ -621,7 +623,7 @@ Json FastMCP::invoke_tool(const std::string& name, const Json& args) const try { - auto result = proxy_mount.proxy->invoke_tool(try_name, args); + auto result = proxy_mount.proxy->invoke_tool(try_name, args, enforce_timeout); if (!result.isError && !result.content.empty()) { // Extract result from CallToolResult diff --git a/src/mcp/handler.cpp b/src/mcp/handler.cpp index 53da602..ebd9658 100644 --- a/src/mcp/handler.cpp +++ b/src/mcp/handler.cpp @@ -32,6 +32,13 @@ static fastmcpp::Json jsonrpc_error(const fastmcpp::Json& id, int code, const st {"error", fastmcpp::Json{{"code", code}, {"message", message}}}}; } +static fastmcpp::Json jsonrpc_tool_error(const fastmcpp::Json& id, const std::exception& e) +{ + if (dynamic_cast(&e)) + return jsonrpc_error(id, -32000, e.what()); + return jsonrpc_error(id, -32603, e.what()); +} + static bool schema_is_object(const fastmcpp::Json& schema) { if (!schema.is_object()) @@ -855,7 +862,7 @@ make_mcp_handler(const std::string& server_name, const std::string& version, } catch (const std::exception& e) { - return jsonrpc_error(id, -32603, e.what()); + return jsonrpc_tool_error(id, e); } } @@ -1008,7 +1015,7 @@ std::function make_mcp_handler( } catch (const std::exception& e) { - return jsonrpc_error(id, -32603, e.what()); + return jsonrpc_tool_error(id, e); } } @@ -1217,7 +1224,7 @@ make_mcp_handler(const std::string& server_name, const std::string& version, } catch (const std::exception& e) { - return jsonrpc_error(id, -32603, e.what()); + return jsonrpc_tool_error(id, e); } } @@ -1355,7 +1362,7 @@ make_mcp_handler(const std::string& server_name, const std::string& version, } catch (const std::exception& e) { - return jsonrpc_error(id, -32603, e.what()); + return jsonrpc_tool_error(id, e); } } @@ -1453,7 +1460,7 @@ make_mcp_handler(const std::string& server_name, const std::string& version, } catch (const std::exception& e) { - return jsonrpc_error(id, -32603, e.what()); + return jsonrpc_tool_error(id, e); } } @@ -1514,7 +1521,7 @@ make_mcp_handler(const std::string& server_name, const std::string& version, } catch (const std::exception& e) { - return jsonrpc_error(id, -32603, e.what()); + return jsonrpc_tool_error(id, e); } } @@ -1677,7 +1684,7 @@ make_mcp_handler(const FastMCP& app, SessionAccessor session_accessor) task_id, [&app, name, args, has_output_schema]() -> fastmcpp::Json { - auto invoke_result = app.invoke_tool(name, args); + auto invoke_result = app.invoke_tool(name, args, false); return build_fastmcp_tool_result(invoke_result, has_output_schema); }); @@ -1716,7 +1723,7 @@ make_mcp_handler(const FastMCP& app, SessionAccessor session_accessor) } catch (const std::exception& e) { - return jsonrpc_error(id, -32603, e.what()); + return jsonrpc_tool_error(id, e); } } diff --git a/src/proxy.cpp b/src/proxy.cpp index bfd1657..61817e1 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -207,12 +207,13 @@ std::vector ProxyApp::list_all_prompts() const // Routing // ========================================================================= -client::CallToolResult ProxyApp::invoke_tool(const std::string& name, const Json& args) const +client::CallToolResult ProxyApp::invoke_tool(const std::string& name, const Json& args, + bool enforce_timeout) const { // Try local first try { - auto result_json = local_tools_.invoke(name, args); + auto result_json = local_tools_.invoke(name, args, enforce_timeout); // Convert to CallToolResult client::CallToolResult result; diff --git a/src/server/context.cpp b/src/server/context.cpp index 54e22b7..d7d2bd3 100644 --- a/src/server/context.cpp +++ b/src/server/context.cpp @@ -11,15 +11,17 @@ namespace fastmcpp::server Context::Context(const resources::ResourceManager& rm, const prompts::PromptManager& pm) : resource_mgr_(&rm), prompt_mgr_(&pm), request_meta_(std::nullopt), request_id_(std::nullopt), - session_id_(std::nullopt) + session_id_(std::nullopt), transport_(std::nullopt) { } Context::Context(const resources::ResourceManager& rm, const prompts::PromptManager& pm, std::optional request_meta, std::optional request_id, - std::optional session_id) + std::optional session_id, + std::optional transport) : resource_mgr_(&rm), prompt_mgr_(&pm), request_meta_(std::move(request_meta)), - request_id_(std::move(request_id)), session_id_(std::move(session_id)) + request_id_(std::move(request_id)), session_id_(std::move(session_id)), + transport_(std::move(transport)) { } diff --git a/tests/server/test_context_full.cpp b/tests/server/test_context_full.cpp index 3f125c9..f8755ac 100644 --- a/tests/server/test_context_full.cpp +++ b/tests/server/test_context_full.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include using namespace fastmcpp; @@ -189,6 +190,29 @@ void test_client_id() std::cout << "PASSED\n"; } +void test_transport() +{ + std::cout << " test_transport... " << std::flush; + + resources::ResourceManager rm; + prompts::PromptManager pm; + + Context ctx_default(rm, pm); + assert(!ctx_default.transport().has_value()); + assert(!ctx_default.transport_type().has_value()); + + Context ctx_stdio(rm, pm, std::nullopt, std::nullopt, std::nullopt, TransportType::Stdio); + assert(ctx_stdio.transport().has_value()); + assert(ctx_stdio.transport().value() == "stdio"); + assert(ctx_stdio.transport_type().has_value()); + assert(ctx_stdio.transport_type().value() == TransportType::Stdio); + + Context ctx_sse(rm, pm, std::nullopt, std::nullopt, std::nullopt, TransportType::Sse); + assert(ctx_sse.transport().value() == "sse"); + + std::cout << "PASSED\n"; +} + void test_progress_token_types() { std::cout << " test_progress_token_types... " << std::flush; @@ -391,6 +415,7 @@ int main() test_progress_without_token(); test_notifications(); test_client_id(); + test_transport(); test_progress_token_types(); test_log_level_to_string(); test_e2e_tool_logging_to_notifications(); diff --git a/tests/server/test_middleware_pipeline.cpp b/tests/server/test_middleware_pipeline.cpp index 9f49479..4255b92 100644 --- a/tests/server/test_middleware_pipeline.cpp +++ b/tests/server/test_middleware_pipeline.cpp @@ -4,8 +4,10 @@ #include "fastmcpp/server/middleware_pipeline.hpp" #include +#include #include #include +#include #include using namespace fastmcpp; @@ -264,6 +266,43 @@ void test_rate_limiting_middleware() std::cout << "PASSED\n"; } +void test_ping_middleware() +{ + std::cout << " test_ping_middleware... " << std::flush; + + std::shared_ptr session; + std::atomic ping_count{0}; + + session = std::make_shared( + "session_ping", + [&](const Json& msg) + { + if (ServerSession::is_request(msg) && msg.value("method", "") == "ping") + { + ping_count.fetch_add(1); + Json response = {{"jsonrpc", "2.0"}, + {"id", msg.at("id")}, + {"result", Json::object()}}; + session->handle_response(response); + } + }); + + MiddlewarePipeline pipeline; + pipeline.add(std::make_shared(std::chrono::milliseconds(10))); + + MiddlewareContext ctx; + ctx.method = "tools/list"; + ctx.session = session; + + pipeline.execute(ctx, [](const MiddlewareContext&) { return Json{{"tools", Json::array()}}; }); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + assert(ping_count.load() > 0); + + std::cout << "PASSED\n"; +} + void test_error_handling_middleware() { std::cout << " test_error_handling_middleware... " << std::flush; @@ -397,6 +436,7 @@ int main() test_timing_middleware(); test_caching_middleware(); test_rate_limiting_middleware(); + test_ping_middleware(); test_error_handling_middleware(); test_combined_pipeline(); test_method_specific_hooks(); diff --git a/tests/tools/test_tool_timeout.cpp b/tests/tools/test_tool_timeout.cpp new file mode 100644 index 0000000..15e3708 --- /dev/null +++ b/tests/tools/test_tool_timeout.cpp @@ -0,0 +1,121 @@ +/// @file test_tool_timeout.cpp +/// @brief Tests for tool execution timeouts + +#include "fastmcpp/tools/manager.hpp" +#include "fastmcpp/tools/tool.hpp" + +#include +#include +#include +#include + +using namespace fastmcpp; +using namespace fastmcpp::tools; +using namespace std::chrono_literals; + +void test_tool_timeout_triggers() +{ + std::cout << " test_tool_timeout_triggers... " << std::flush; + + Tool slow_tool( + "slow", + Json::object(), + Json::object(), + [](const Json&) -> Json + { + std::this_thread::sleep_for(50ms); + return Json{{"ok", true}}; + }); + + slow_tool.set_timeout(10ms); + + bool threw = false; + try + { + slow_tool.invoke(Json::object()); + } + catch (const ToolTimeoutError&) + { + threw = true; + } + + assert(threw); + std::cout << "PASSED\n"; +} + +void test_tool_timeout_disabled() +{ + std::cout << " test_tool_timeout_disabled... " << std::flush; + + Tool slow_tool( + "slow_no_timeout", + Json::object(), + Json::object(), + [](const Json&) -> Json + { + std::this_thread::sleep_for(30ms); + return Json{{"ok", true}}; + }); + + slow_tool.set_timeout(5ms); + + auto result = slow_tool.invoke(Json::object(), false); + assert(result["ok"].get() == true); + std::cout << "PASSED\n"; +} + +void test_manager_timeout_toggle() +{ + std::cout << " test_manager_timeout_toggle... " << std::flush; + + Tool slow_tool( + "slow_manager", + Json::object(), + Json::object(), + [](const Json&) -> Json + { + std::this_thread::sleep_for(40ms); + return Json{{"ok", true}}; + }); + + slow_tool.set_timeout(10ms); + + ToolManager tm; + tm.register_tool(slow_tool); + + bool threw = false; + try + { + tm.invoke("slow_manager", Json::object()); + } + catch (const ToolTimeoutError&) + { + threw = true; + } + assert(threw); + + auto result = tm.invoke("slow_manager", Json::object(), false); + assert(result["ok"].get() == true); + + std::cout << "PASSED\n"; +} + +int main() +{ + std::cout << "Tool Timeout Tests\n"; + std::cout << "==================\n"; + + try + { + test_tool_timeout_triggers(); + test_tool_timeout_disabled(); + test_manager_timeout_toggle(); + std::cout << "\nAll tests passed!\n"; + return 0; + } + catch (const std::exception& e) + { + std::cerr << "\nTest failed with exception: " << e.what() << "\n"; + return 1; + } +} From 7056457e64b17c8c7a6155fabb09a49f5e5f2ee0 Mon Sep 17 00:00:00 2001 From: Elias Bachaalany Date: Sun, 18 Jan 2026 16:23:48 -0800 Subject: [PATCH 2/4] Add telemetry spans and trace propagation Add telemetry helpers, client/server span instrumentation, and tests for trace context propagation. --- CMakeLists.txt | 5 + include/fastmcpp/client/client.hpp | 50 +++- include/fastmcpp/client/transports.hpp | 7 +- include/fastmcpp/telemetry.hpp | 155 +++++++++++ src/mcp/handler.cpp | 204 +++++++++++++-- src/telemetry.cpp | 339 +++++++++++++++++++++++++ tests/telemetry/tracing.cpp | 143 +++++++++++ 7 files changed, 877 insertions(+), 26 deletions(-) create mode 100644 include/fastmcpp/telemetry.hpp create mode 100644 src/telemetry.cpp create mode 100644 tests/telemetry/tracing.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index ca15b01..f8d6683 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,7 @@ add_library(fastmcpp_core STATIC src/client/client.cpp src/client/sampling_handlers.cpp src/client/transports.cpp + src/telemetry.cpp src/util/json_schema.cpp src/util/json_schema_type.cpp src/settings.cpp @@ -446,6 +447,10 @@ if(FASTMCPP_BUILD_TESTS) target_link_libraries(fastmcpp_server_middleware_pipeline PRIVATE fastmcpp_core) add_test(NAME fastmcpp_server_middleware_pipeline COMMAND fastmcpp_server_middleware_pipeline) + add_executable(fastmcpp_telemetry tests/telemetry/tracing.cpp) + target_link_libraries(fastmcpp_telemetry PRIVATE fastmcpp_core) + add_test(NAME fastmcpp_telemetry COMMAND fastmcpp_telemetry) + add_executable(fastmcpp_stdio_client tests/transports/stdio_client.cpp) target_link_libraries(fastmcpp_stdio_client PRIVATE fastmcpp_core) add_test(NAME fastmcpp_stdio_client COMMAND fastmcpp_stdio_client) diff --git a/include/fastmcpp/client/client.hpp b/include/fastmcpp/client/client.hpp index 57ca821..7aaecac 100644 --- a/include/fastmcpp/client/client.hpp +++ b/include/fastmcpp/client/client.hpp @@ -7,6 +7,7 @@ #include "fastmcpp/client/types.hpp" #include "fastmcpp/exceptions.hpp" #include "fastmcpp/server/server.hpp" +#include "fastmcpp/telemetry.hpp" #include "fastmcpp/types.hpp" #include "fastmcpp/util/json_schema.hpp" #include "fastmcpp/util/json_schema_type.hpp" @@ -68,6 +69,15 @@ class IServerRequestTransport virtual void set_server_request_handler(ServerRequestHandler handler) = 0; }; +/// Optional transport interface: some transports expose MCP session IDs. +class ISessionTransport +{ + public: + virtual ~ISessionTransport() = default; + virtual std::string session_id() const = 0; + virtual bool has_session() const = 0; +}; + /// Loopback transport for in-process server testing class LoopbackTransport : public ITransport { @@ -240,12 +250,15 @@ class Client CallToolResult call_tool_mcp(const std::string& name, const fastmcpp::Json& arguments, const CallToolOptions& options = CallToolOptions{}) { + auto span = + telemetry::client_span("tool " + name, "tools/call", name, transport_session_id()); fastmcpp::Json payload = {{"name", name}, {"arguments", arguments}}; // Add _meta if provided - if (options.meta) - payload["_meta"] = *options.meta; + auto propagated_meta = telemetry::inject_trace_context(options.meta); + if (propagated_meta) + payload["_meta"] = *propagated_meta; if (options.progress_handler) options.progress_handler(0.0f, std::nullopt, "request started"); @@ -439,7 +452,13 @@ class Client /// Read a resource by URI ReadResourceResult read_resource_mcp(const std::string& uri) { - auto response = call("resources/read", {{"uri", uri}}); + auto span = telemetry::client_span("resource " + uri, "resources/read", uri, + transport_session_id()); + fastmcpp::Json payload = {{"uri", uri}}; + auto propagated_meta = telemetry::inject_trace_context(std::nullopt); + if (propagated_meta) + payload["_meta"] = *propagated_meta; + auto response = call("resources/read", payload); return parse_read_resource_result(response); } @@ -477,7 +496,8 @@ class Client GetPromptResult get_prompt_mcp(const std::string& name, const fastmcpp::Json& arguments = fastmcpp::Json::object()) { - + auto span = + telemetry::client_span("prompt " + name, "prompts/get", name, transport_session_id()); fastmcpp::Json payload = {{"name", name}}; if (!arguments.empty()) { @@ -491,6 +511,10 @@ class Client payload["arguments"] = stringArgs; } + auto propagated_meta = telemetry::inject_trace_context(std::nullopt); + if (propagated_meta) + payload["_meta"] = *propagated_meta; + auto response = call("prompts/get", payload); return parse_get_prompt_result(response); } @@ -810,6 +834,18 @@ class Client } } + std::optional transport_session_id() const + { + if (!transport_) + return std::nullopt; + if (auto* session_transport = dynamic_cast(transport_.get())) + { + if (session_transport->has_session()) + return session_transport->session_id(); + } + return std::nullopt; + } + // Internal constructor for cloning Client(std::shared_ptr t, std::shared_ptr callbacks, bool /*internal*/) @@ -1541,6 +1577,9 @@ inline std::shared_ptr Client::read_resource_task(const std::strin fastmcpp::Json task_meta = {{"ttl", ttl_ms}}; payload["_meta"] = fastmcpp::Json{{"modelcontextprotocol.io/task", std::move(task_meta)}}; + auto propagated_meta = telemetry::inject_trace_context(payload["_meta"]); + if (propagated_meta) + payload["_meta"] = *propagated_meta; auto response = call("resources/read", payload); @@ -1575,6 +1614,9 @@ Client::get_prompt_task(const std::string& name, const fastmcpp::Json& arguments fastmcpp::Json task_meta = {{"ttl", ttl_ms}}; payload["_meta"] = fastmcpp::Json{{"modelcontextprotocol.io/task", std::move(task_meta)}}; + auto propagated_meta = telemetry::inject_trace_context(payload["_meta"]); + if (propagated_meta) + payload["_meta"] = *propagated_meta; auto response = call("prompts/get", payload); diff --git a/include/fastmcpp/client/transports.hpp b/include/fastmcpp/client/transports.hpp index 2edf03f..f2dde60 100644 --- a/include/fastmcpp/client/transports.hpp +++ b/include/fastmcpp/client/transports.hpp @@ -114,7 +114,8 @@ class StdioTransport : public ITransport /// 3. Server sends JSON-RPC responses back via the SSE stream class SseClientTransport : public ITransport, public IServerRequestTransport, - public IResettableTransport + public IResettableTransport, + public ISessionTransport { public: /// Construct an SSE client transport @@ -179,7 +180,9 @@ class SseClientTransport : public ITransport, /// 3. Session ID management via Mcp-Session-Id header /// /// Reference: https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/transports/ -class StreamableHttpTransport : public ITransport, public IResettableTransport +class StreamableHttpTransport : public ITransport, + public IResettableTransport, + public ISessionTransport { public: /// Construct a Streamable HTTP client transport diff --git a/include/fastmcpp/telemetry.hpp b/include/fastmcpp/telemetry.hpp new file mode 100644 index 0000000..4ea4038 --- /dev/null +++ b/include/fastmcpp/telemetry.hpp @@ -0,0 +1,155 @@ +// fastmcpp OpenTelemetry-style tracing helpers (no-op unless exporter configured) +#pragma once + +#include "fastmcpp/types.hpp" + +#include +#include +#include +#include +#include + +namespace fastmcpp::telemetry +{ + +constexpr const char* INSTRUMENTATION_NAME = "fastmcp"; +constexpr const char* TRACE_PARENT_KEY = "fastmcp.traceparent"; +constexpr const char* TRACE_STATE_KEY = "fastmcp.tracestate"; + +struct SpanContext +{ + std::string trace_id; + std::string span_id; + + bool is_valid() const + { + return trace_id.size() == 32 && span_id.size() == 16; + } +}; + +enum class SpanKind +{ + Internal, + Client, + Server +}; + +enum class StatusCode +{ + Unset, + Ok, + Error +}; + +struct Span +{ + std::string name; + std::string instrumentation_name; + std::optional instrumentation_version; + SpanKind kind{SpanKind::Internal}; + SpanContext context{}; + std::optional parent; + StatusCode status{StatusCode::Unset}; + std::unordered_map attributes; + std::optional exception_message; + + void set_attribute(const std::string& key, const fastmcpp::Json& value) + { + attributes[key] = value; + } + + void set_attributes(const std::unordered_map& attrs) + { + attributes.insert(attrs.begin(), attrs.end()); + } + + void record_exception(const std::string& message) + { + exception_message = message; + status = StatusCode::Error; + } + + void set_status(StatusCode code) + { + status = code; + } +}; + +class SpanExporter +{ + public: + virtual ~SpanExporter() = default; + virtual void export_span(const Span& span) = 0; +}; + +class InMemorySpanExporter : public SpanExporter +{ + public: + void export_span(const Span& span) override; + const std::vector& finished_spans() const; + void reset(); + + private: + std::vector spans_; +}; + +class SpanScope +{ + public: + SpanScope() = default; + explicit SpanScope(Span span, bool active); + SpanScope(const SpanScope&) = delete; + SpanScope& operator=(const SpanScope&) = delete; + SpanScope(SpanScope&& other) noexcept; + SpanScope& operator=(SpanScope&& other) noexcept; + ~SpanScope(); + + Span& span(); + bool active() const; + void end(); + + private: + void finalize(bool record_error); + + bool active_{false}; + bool ended_{false}; + int uncaught_on_enter_{0}; + Span span_; +}; + +class Tracer +{ + public: + explicit Tracer(std::string instrumentation_name, std::optional version) + : instrumentation_name_(std::move(instrumentation_name)), version_(std::move(version)) + { + } + + SpanScope start_span(const std::string& name, SpanKind kind, + const std::optional& parent = std::nullopt) const; + + private: + std::string instrumentation_name_; + std::optional version_; +}; + +Tracer get_tracer(std::optional version = std::nullopt); +void set_span_exporter(std::shared_ptr exporter); +std::shared_ptr span_exporter(); +SpanContext current_span_context(); + +std::optional inject_trace_context(const std::optional& meta); +SpanContext extract_trace_context(const std::optional& meta); + +SpanScope client_span(const std::string& name, const std::string& method, + const std::string& component_key, + const std::optional& session_id = std::nullopt); +SpanScope server_span(const std::string& name, const std::string& method, + const std::string& server_name, const std::string& component_type, + const std::string& component_key, + const std::optional& request_meta, + const std::optional& session_id = std::nullopt); +SpanScope delegate_span(const std::string& name, const std::string& provider_type, + const std::string& component_key); + +} // namespace fastmcpp::telemetry diff --git a/src/mcp/handler.cpp b/src/mcp/handler.cpp index ebd9658..d504690 100644 --- a/src/mcp/handler.cpp +++ b/src/mcp/handler.cpp @@ -4,6 +4,7 @@ #include "fastmcpp/mcp/tasks.hpp" #include "fastmcpp/proxy.hpp" #include "fastmcpp/server/sse_server.hpp" +#include "fastmcpp/telemetry.hpp" #include #include @@ -67,6 +68,13 @@ static std::string extract_session_id(const fastmcpp::Json& params) return ""; } +static std::optional extract_request_meta(const fastmcpp::Json& params) +{ + if (params.contains("_meta") && params["_meta"].is_object()) + return params["_meta"]; + return std::nullopt; +} + static fastmcpp::Json normalize_output_schema_for_mcp(const fastmcpp::Json& schema) { if (schema.is_null()) @@ -778,6 +786,7 @@ make_mcp_handler(const std::string& server_name, const std::string& version, const auto id = message.contains("id") ? message.at("id") : fastmcpp::Json(); std::string method = message.value("method", ""); fastmcpp::Json params = message.value("params", fastmcpp::Json::object()); + const std::string session_id = extract_session_id(params); if (method == "initialize") { @@ -849,6 +858,12 @@ make_mcp_handler(const std::string& server_name, const std::string& version, fastmcpp::Json args = params.value("arguments", fastmcpp::Json::object()); if (name.empty()) return jsonrpc_error(id, -32602, "Missing tool name"); + auto span = + telemetry::server_span("tool " + name, "tools/call", server_name, "tool", + name, extract_request_meta(params), + session_id.empty() ? std::nullopt + : std::optional( + session_id)); try { const auto& tool = tools.get(name); @@ -862,6 +877,8 @@ make_mcp_handler(const std::string& server_name, const std::string& version, } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_tool_error(id, e); } } @@ -929,6 +946,7 @@ std::function make_mcp_handler( const auto id = message.contains("id") ? message.at("id") : fastmcpp::Json(); std::string method = message.value("method", ""); fastmcpp::Json params = message.value("params", fastmcpp::Json::object()); + const std::string session_id = extract_session_id(params); if (method == "initialize") { @@ -1006,6 +1024,10 @@ std::function make_mcp_handler( fastmcpp::Json args = params.value("arguments", fastmcpp::Json::object()); if (name.empty()) return jsonrpc_error(id, -32602, "Missing tool name"); + auto span = telemetry::server_span( + "tool " + name, "tools/call", server.name(), "tool", name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { auto result = server.handle(name, args); @@ -1015,6 +1037,8 @@ std::function make_mcp_handler( } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_tool_error(id, e); } } @@ -1036,6 +1060,11 @@ std::function make_mcp_handler( } if (method == "resources/read") { + std::string uri = params.value("uri", ""); + auto span = telemetry::server_span( + "resource " + uri, "resources/read", server.name(), "resource", uri, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { auto routed = server.handle(method, params); @@ -1043,6 +1072,8 @@ std::function make_mcp_handler( } catch (...) { + if (span.active()) + span.span().record_exception("resource read failed"); return fastmcpp::Json{ {"jsonrpc", "2.0"}, {"id", id}, @@ -1066,6 +1097,11 @@ std::function make_mcp_handler( } if (method == "prompts/get") { + std::string prompt_name = params.value("name", ""); + auto span = telemetry::server_span( + "prompt " + prompt_name, "prompts/get", server.name(), "prompt", + prompt_name, extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { auto routed = server.handle(method, params); @@ -1073,6 +1109,8 @@ std::function make_mcp_handler( } catch (...) { + if (span.active()) + span.span().record_exception("prompt get failed"); return fastmcpp::Json{ {"jsonrpc", "2.0"}, {"id", id}, @@ -1136,6 +1174,7 @@ make_mcp_handler(const std::string& server_name, const std::string& version, const auto id = message.contains("id") ? message.at("id") : fastmcpp::Json(); std::string method = message.value("method", ""); fastmcpp::Json params = message.value("params", fastmcpp::Json::object()); + const std::string session_id = extract_session_id(params); if (method == "initialize") { @@ -1210,6 +1249,10 @@ make_mcp_handler(const std::string& server_name, const std::string& version, fastmcpp::Json args = params.value("arguments", fastmcpp::Json::object()); if (name.empty()) return jsonrpc_error(id, -32602, "Missing tool name"); + auto span = telemetry::server_span( + "tool " + name, "tools/call", server.name(), "tool", name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { const auto& tool = tools.get(name); @@ -1224,13 +1267,59 @@ make_mcp_handler(const std::string& server_name, const std::string& version, } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_tool_error(id, e); } } - // Resources, prompts, etc. - route through server - if (method == "resources/list" || method == "resources/read" || - method == "prompts/list" || method == "prompts/get") + if (method == "resources/read") + { + std::string uri = params.value("uri", ""); + auto span = telemetry::server_span( + "resource " + uri, "resources/read", server.name(), "resource", uri, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); + try + { + auto routed = server.handle(method, params); + return fastmcpp::Json{{"jsonrpc", "2.0"}, {"id", id}, {"result", routed}}; + } + catch (...) + { + if (span.active()) + span.span().record_exception("resource read failed"); + return fastmcpp::Json{ + {"jsonrpc", "2.0"}, + {"id", id}, + {"result", fastmcpp::Json{{"contents", fastmcpp::Json::array()}}}}; + } + } + + if (method == "prompts/get") + { + std::string prompt_name = params.value("name", ""); + auto span = telemetry::server_span( + "prompt " + prompt_name, "prompts/get", server.name(), "prompt", + prompt_name, extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); + try + { + auto routed = server.handle(method, params); + return fastmcpp::Json{{"jsonrpc", "2.0"}, {"id", id}, {"result", routed}}; + } + catch (...) + { + if (span.active()) + span.span().record_exception("prompt get failed"); + return fastmcpp::Json{ + {"jsonrpc", "2.0"}, + {"id", id}, + {"result", fastmcpp::Json{{"messages", fastmcpp::Json::array()}}}}; + } + } + + if (method == "resources/list" || method == "prompts/list") { try { @@ -1239,27 +1328,15 @@ make_mcp_handler(const std::string& server_name, const std::string& version, } catch (...) { - // Return empty result for unimplemented if (method == "resources/list") return fastmcpp::Json{ {"jsonrpc", "2.0"}, {"id", id}, {"result", fastmcpp::Json{{"resources", fastmcpp::Json::array()}}}}; - if (method == "resources/read") - return fastmcpp::Json{ - {"jsonrpc", "2.0"}, - {"id", id}, - {"result", fastmcpp::Json{{"contents", fastmcpp::Json::array()}}}}; - if (method == "prompts/list") - return fastmcpp::Json{ - {"jsonrpc", "2.0"}, - {"id", id}, - {"result", fastmcpp::Json{{"prompts", fastmcpp::Json::array()}}}}; - if (method == "prompts/get") - return fastmcpp::Json{ - {"jsonrpc", "2.0"}, - {"id", id}, - {"result", fastmcpp::Json{{"messages", fastmcpp::Json::array()}}}}; + return fastmcpp::Json{ + {"jsonrpc", "2.0"}, + {"id", id}, + {"result", fastmcpp::Json{{"prompts", fastmcpp::Json::array()}}}}; } } @@ -1287,6 +1364,7 @@ make_mcp_handler(const std::string& server_name, const std::string& version, const auto id = message.contains("id") ? message.at("id") : fastmcpp::Json(); std::string method = message.value("method", ""); fastmcpp::Json params = message.value("params", fastmcpp::Json::object()); + const std::string session_id = extract_session_id(params); if (method == "initialize") { @@ -1349,6 +1427,10 @@ make_mcp_handler(const std::string& server_name, const std::string& version, fastmcpp::Json args = params.value("arguments", fastmcpp::Json::object()); if (name.empty()) return jsonrpc_error(id, -32602, "Missing tool name"); + auto span = telemetry::server_span( + "tool " + name, "tools/call", server.name(), "tool", name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { const auto& tool = tools.get(name); @@ -1362,6 +1444,8 @@ make_mcp_handler(const std::string& server_name, const std::string& version, } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_tool_error(id, e); } } @@ -1414,6 +1498,10 @@ make_mcp_handler(const std::string& server_name, const std::string& version, // Strip trailing slashes for compatibility with Python fastmcp while (!uri.empty() && uri.back() == '/') uri.pop_back(); + auto span = telemetry::server_span( + "resource " + uri, "resources/read", server.name(), "resource", uri, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { auto content = resources.read(uri, params); @@ -1498,6 +1586,10 @@ make_mcp_handler(const std::string& server_name, const std::string& version, std::string name = params.value("name", ""); if (name.empty()) return jsonrpc_error(id, -32602, "Missing prompt name"); + auto span = telemetry::server_span( + "prompt " + name, "prompts/get", server.name(), "prompt", name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { fastmcpp::Json args = params.value("arguments", fastmcpp::Json::object()); @@ -1517,10 +1609,14 @@ make_mcp_handler(const std::string& server_name, const std::string& version, } catch (const NotFoundError& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32602, e.what()); } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_tool_error(id, e); } } @@ -1551,6 +1647,7 @@ make_mcp_handler(const FastMCP& app, SessionAccessor session_accessor) const auto id = message.contains("id") ? message.at("id") : fastmcpp::Json(); std::string method = message.value("method", ""); fastmcpp::Json params = message.value("params", fastmcpp::Json::object()); + const std::string session_id = extract_session_id(params); if (method == "initialize") { @@ -1635,6 +1732,10 @@ make_mcp_handler(const FastMCP& app, SessionAccessor session_accessor) fastmcpp::Json args = params.value("arguments", fastmcpp::Json::object()); if (name.empty()) return jsonrpc_error(id, -32602, "Missing tool name"); + auto span = telemetry::server_span( + "tool " + name, "tools/call", app.name(), "tool", name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { bool has_output_schema = false; @@ -1723,6 +1824,8 @@ make_mcp_handler(const FastMCP& app, SessionAccessor session_accessor) } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_tool_error(id, e); } } @@ -1884,6 +1987,10 @@ make_mcp_handler(const FastMCP& app, SessionAccessor session_accessor) return jsonrpc_error(id, -32602, "Missing resource URI"); while (!uri.empty() && uri.back() == '/') uri.pop_back(); + auto span = telemetry::server_span( + "resource " + uri, "resources/read", app.name(), "resource", uri, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { int ttl_ms = 60000; @@ -2015,10 +2122,14 @@ make_mcp_handler(const FastMCP& app, SessionAccessor session_accessor) } catch (const NotFoundError& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32602, e.what()); } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32603, e.what()); } } @@ -2060,6 +2171,10 @@ make_mcp_handler(const FastMCP& app, SessionAccessor session_accessor) std::string name = params.value("name", ""); if (name.empty()) return jsonrpc_error(id, -32602, "Missing prompt name"); + auto span = telemetry::server_span( + "prompt " + name, "prompts/get", app.name(), "prompt", name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { int ttl_ms = 60000; @@ -2148,10 +2263,14 @@ make_mcp_handler(const FastMCP& app, SessionAccessor session_accessor) } catch (const NotFoundError& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32602, e.what()); } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32603, e.what()); } } @@ -2175,6 +2294,7 @@ std::function make_mcp_handler(const Prox const auto id = message.contains("id") ? message.at("id") : fastmcpp::Json(); std::string method = message.value("method", ""); fastmcpp::Json params = message.value("params", fastmcpp::Json::object()); + const std::string session_id = extract_session_id(params); if (method == "initialize") { @@ -2241,6 +2361,10 @@ std::function make_mcp_handler(const Prox fastmcpp::Json arguments = params.value("arguments", fastmcpp::Json::object()); if (name.empty()) return jsonrpc_error(id, -32602, "Missing tool name"); + auto span = telemetry::server_span( + "tool " + name, "tools/call", app.name(), "tool", name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { auto result = app.invoke_tool(name, arguments); @@ -2288,6 +2412,8 @@ std::function make_mcp_handler(const Prox } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32603, e.what()); } } @@ -2338,6 +2464,10 @@ std::function make_mcp_handler(const Prox std::string uri = params.value("uri", ""); if (uri.empty()) return jsonrpc_error(id, -32602, "Missing resource URI"); + auto span = telemetry::server_span( + "resource " + uri, "resources/read", app.name(), "resource", uri, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { auto result = app.read_resource(uri); @@ -2370,10 +2500,14 @@ std::function make_mcp_handler(const Prox } catch (const NotFoundError& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32602, e.what()); } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32603, e.what()); } } @@ -2412,6 +2546,10 @@ std::function make_mcp_handler(const Prox std::string name = params.value("name", ""); if (name.empty()) return jsonrpc_error(id, -32602, "Missing prompt name"); + auto span = telemetry::server_span( + "prompt " + name, "prompts/get", app.name(), "prompt", name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { fastmcpp::Json args = params.value("arguments", fastmcpp::Json::object()); @@ -2459,10 +2597,14 @@ std::function make_mcp_handler(const Prox } catch (const NotFoundError& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32602, e.what()); } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32603, e.what()); } } @@ -2541,7 +2683,7 @@ make_mcp_handler_with_sampling(const FastMCP& app, SessionAccessor session_acces fastmcpp::Json params = message.value("params", fastmcpp::Json::object()); // Extract session_id for sampling support - std::string session_id = extract_session_id(params); + const std::string session_id = extract_session_id(params); if (method == "initialize") { @@ -2637,6 +2779,10 @@ make_mcp_handler_with_sampling(const FastMCP& app, SessionAccessor session_acces fastmcpp::Json args = params.value("arguments", fastmcpp::Json::object()); if (name.empty()) return jsonrpc_error(id, -32602, "Missing tool name"); + auto span = telemetry::server_span( + "tool " + name, "tools/call", app.name(), "tool", name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); bool has_output_schema = false; for (const auto& tool_info : app.list_all_tools_info()) @@ -2673,6 +2819,8 @@ make_mcp_handler_with_sampling(const FastMCP& app, SessionAccessor session_acces } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32603, e.what()); } } @@ -2726,6 +2874,10 @@ make_mcp_handler_with_sampling(const FastMCP& app, SessionAccessor session_acces return jsonrpc_error(id, -32602, "Missing resource URI"); while (!uri.empty() && uri.back() == '/') uri.pop_back(); + auto span = telemetry::server_span( + "resource " + uri, "resources/read", app.name(), "resource", uri, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { auto content = app.read_resource(uri, params); @@ -2766,10 +2918,14 @@ make_mcp_handler_with_sampling(const FastMCP& app, SessionAccessor session_acces } catch (const NotFoundError& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32602, e.what()); } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32603, e.what()); } } @@ -2808,6 +2964,10 @@ make_mcp_handler_with_sampling(const FastMCP& app, SessionAccessor session_acces std::string prompt_name = params.value("name", ""); if (prompt_name.empty()) return jsonrpc_error(id, -32602, "Missing prompt name"); + auto span = telemetry::server_span( + "prompt " + prompt_name, "prompts/get", app.name(), "prompt", prompt_name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { fastmcpp::Json args = params.value("arguments", fastmcpp::Json::object()); @@ -2836,10 +2996,14 @@ make_mcp_handler_with_sampling(const FastMCP& app, SessionAccessor session_acces } catch (const NotFoundError& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32602, e.what()); } catch (const std::exception& e) { + if (span.active()) + span.span().record_exception(e.what()); return jsonrpc_error(id, -32603, e.what()); } } diff --git a/src/telemetry.cpp b/src/telemetry.cpp new file mode 100644 index 0000000..d946527 --- /dev/null +++ b/src/telemetry.cpp @@ -0,0 +1,339 @@ +#include "fastmcpp/telemetry.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace fastmcpp::telemetry +{ +namespace +{ + +std::shared_ptr& exporter_ref() +{ + static std::shared_ptr exporter; + return exporter; +} + +thread_local std::vector context_stack; + +bool telemetry_enabled() +{ + return exporter_ref() != nullptr; +} + +std::string to_hex(const std::vector& bytes) +{ + std::ostringstream oss; + for (auto b : bytes) + oss << std::hex << std::setw(2) << std::setfill('0') << static_cast(b); + return oss.str(); +} + +std::string random_hex_bytes(size_t count) +{ + std::vector bytes(count); + std::random_device rd; + std::mt19937_64 gen(rd()); + std::uniform_int_distribution dist(0, 255); + for (auto& b : bytes) + b = static_cast(dist(gen)); + return to_hex(bytes); +} + +bool is_hex_string(const std::string& value) +{ + return std::all_of(value.begin(), value.end(), + [](unsigned char c) { return std::isxdigit(c) != 0; }); +} + +std::string build_traceparent(const SpanContext& ctx) +{ + if (!ctx.is_valid()) + return ""; + // Version 00, sampled flag set to 01. + return "00-" + ctx.trace_id + "-" + ctx.span_id + "-01"; +} + +SpanContext parse_traceparent(const std::string& value) +{ + SpanContext ctx; + if (value.size() < 55) + return ctx; + + std::vector parts; + size_t start = 0; + while (start < value.size()) + { + auto pos = value.find('-', start); + if (pos == std::string::npos) + { + parts.push_back(value.substr(start)); + break; + } + parts.push_back(value.substr(start, pos - start)); + start = pos + 1; + } + + if (parts.size() < 4) + return ctx; + + const auto& trace_id = parts[1]; + const auto& span_id = parts[2]; + + if (trace_id.size() != 32 || span_id.size() != 16) + return ctx; + if (!is_hex_string(trace_id) || !is_hex_string(span_id)) + return ctx; + + ctx.trace_id = trace_id; + ctx.span_id = span_id; + return ctx; +} + +std::optional ensure_object_meta(const std::optional& meta) +{ + if (meta && meta->is_object()) + return meta; + return fastmcpp::Json::object(); +} + +} // namespace + +void InMemorySpanExporter::export_span(const Span& span) +{ + spans_.push_back(span); +} + +const std::vector& InMemorySpanExporter::finished_spans() const +{ + return spans_; +} + +void InMemorySpanExporter::reset() +{ + spans_.clear(); +} + +SpanScope::SpanScope(Span span, bool active) : active_(active), span_(std::move(span)) +{ + if (!active_) + return; + uncaught_on_enter_ = std::uncaught_exceptions(); + context_stack.push_back(span_.context); +} + +SpanScope::SpanScope(SpanScope&& other) noexcept + : active_(other.active_), ended_(other.ended_), uncaught_on_enter_(other.uncaught_on_enter_), + span_(std::move(other.span_)) +{ + other.active_ = false; + other.ended_ = true; +} + +SpanScope& SpanScope::operator=(SpanScope&& other) noexcept +{ + if (this == &other) + return *this; + + if (active_ && !ended_) + finalize(false); + + active_ = other.active_; + ended_ = other.ended_; + uncaught_on_enter_ = other.uncaught_on_enter_; + span_ = std::move(other.span_); + + other.active_ = false; + other.ended_ = true; + return *this; +} + +SpanScope::~SpanScope() +{ + if (ended_) + return; + bool record_error = std::uncaught_exceptions() > uncaught_on_enter_; + finalize(record_error); +} + +Span& SpanScope::span() +{ + return span_; +} + +bool SpanScope::active() const +{ + return active_; +} + +void SpanScope::end() +{ + if (ended_) + return; + finalize(false); +} + +void SpanScope::finalize(bool record_error) +{ + if (!active_) + { + ended_ = true; + return; + } + + if (record_error) + span_.status = StatusCode::Error; + if (span_.status == StatusCode::Unset) + span_.status = StatusCode::Ok; + + if (auto exporter = exporter_ref()) + exporter->export_span(span_); + + if (!context_stack.empty()) + context_stack.pop_back(); + + ended_ = true; +} + +SpanScope Tracer::start_span(const std::string& name, SpanKind kind, + const std::optional& parent) const +{ + if (!telemetry_enabled()) + return SpanScope{}; + + Span span; + span.name = name; + span.kind = kind; + span.instrumentation_name = instrumentation_name_; + span.instrumentation_version = version_; + + SpanContext parent_ctx; + if (parent && parent->is_valid()) + parent_ctx = *parent; + else if (!context_stack.empty()) + parent_ctx = context_stack.back(); + + if (parent_ctx.is_valid()) + { + span.parent = parent_ctx; + span.context.trace_id = parent_ctx.trace_id; + } + else + { + span.context.trace_id = random_hex_bytes(16); + } + span.context.span_id = random_hex_bytes(8); + + return SpanScope(std::move(span), true); +} + +Tracer get_tracer(std::optional version) +{ + return Tracer(INSTRUMENTATION_NAME, std::move(version)); +} + +void set_span_exporter(std::shared_ptr exporter) +{ + exporter_ref() = std::move(exporter); +} + +std::shared_ptr span_exporter() +{ + return exporter_ref(); +} + +SpanContext current_span_context() +{ + if (!context_stack.empty()) + return context_stack.back(); + return {}; +} + +std::optional inject_trace_context(const std::optional& meta) +{ + auto ctx = current_span_context(); + if (!ctx.is_valid()) + return meta; + + auto out = ensure_object_meta(meta); + std::string traceparent = build_traceparent(ctx); + if (!traceparent.empty()) + (*out)[TRACE_PARENT_KEY] = traceparent; + return out; +} + +SpanContext extract_trace_context(const std::optional& meta) +{ + auto current = current_span_context(); + if (current.is_valid()) + return current; + + if (!meta || !meta->is_object()) + return {}; + + auto it = meta->find(TRACE_PARENT_KEY); + if (it == meta->end() || !it->is_string()) + return {}; + + return parse_traceparent(it->get()); +} + +SpanScope client_span(const std::string& name, const std::string& method, + const std::string& component_key, + const std::optional& session_id) +{ + auto span = get_tracer().start_span(name, SpanKind::Client); + if (!span.active()) + return span; + + span.span().set_attributes( + {{"rpc.system", "mcp"}, + {"rpc.method", method}, + {"fastmcp.component.key", component_key}}); + if (session_id && !session_id->empty()) + span.span().set_attribute("fastmcp.session.id", *session_id); + return span; +} + +SpanScope server_span(const std::string& name, const std::string& method, + const std::string& server_name, const std::string& component_type, + const std::string& component_key, + const std::optional& request_meta, + const std::optional& session_id) +{ + auto parent_ctx = extract_trace_context(request_meta); + auto span = get_tracer().start_span(name, SpanKind::Server, + parent_ctx.is_valid() ? std::optional(parent_ctx) + : std::nullopt); + if (!span.active()) + return span; + + span.span().set_attributes( + {{"rpc.system", "mcp"}, + {"rpc.service", server_name}, + {"rpc.method", method}, + {"fastmcp.server.name", server_name}, + {"fastmcp.component.type", component_type}, + {"fastmcp.component.key", component_key}}); + if (session_id && !session_id->empty()) + span.span().set_attribute("fastmcp.session.id", *session_id); + return span; +} + +SpanScope delegate_span(const std::string& name, const std::string& provider_type, + const std::string& component_key) +{ + auto span = get_tracer().start_span("delegate " + name, SpanKind::Internal); + if (!span.active()) + return span; + span.span().set_attributes( + {{"fastmcp.provider.type", provider_type}, {"fastmcp.component.key", component_key}}); + return span; +} + +} // namespace fastmcpp::telemetry diff --git a/tests/telemetry/tracing.cpp b/tests/telemetry/tracing.cpp new file mode 100644 index 0000000..e81408c --- /dev/null +++ b/tests/telemetry/tracing.cpp @@ -0,0 +1,143 @@ +/// @brief OpenTelemetry-style tracing tests for fastmcpp + +#include "fastmcpp/app.hpp" +#include "fastmcpp/client/client.hpp" +#include "fastmcpp/mcp/handler.hpp" +#include "fastmcpp/telemetry.hpp" +#include "fastmcpp/tools/tool.hpp" + +#include +#include +#include +#include +#include + +using namespace fastmcpp; + +namespace +{ + +class SessionTransport : public client::ITransport, public client::ISessionTransport +{ + public: + explicit SessionTransport(client::InProcessMcpTransport::HandlerFn handler, + std::string session_id) + : handler_(std::move(handler)), session_id_(std::move(session_id)) + { + } + + fastmcpp::Json request(const std::string& route, const fastmcpp::Json& payload) override + { + static int request_id = 0; + fastmcpp::Json jsonrpc_request = { + {"jsonrpc", "2.0"}, {"id", ++request_id}, {"method", route}, {"params", payload}}; + fastmcpp::Json response = handler_(jsonrpc_request); + if (response.contains("error")) + throw fastmcpp::Error(response["error"].value("message", "Unknown error")); + return response.value("result", fastmcpp::Json::object()); + } + + std::string session_id() const override + { + return session_id_; + } + + bool has_session() const override + { + return !session_id_.empty(); + } + + private: + client::InProcessMcpTransport::HandlerFn handler_; + std::string session_id_; +}; + +telemetry::Span* find_span(std::vector& spans, const std::string& name, + bool has_server_name) +{ + for (auto& span : spans) + { + if (span.name != name) + continue; + bool has_attr = span.attributes.count("fastmcp.server.name") > 0; + if (has_attr == has_server_name) + return &span; + } + return nullptr; +} + +} // namespace + +int main() +{ + auto exporter = std::make_shared(); + telemetry::set_span_exporter(exporter); + + exporter->reset(); + { + auto span = telemetry::get_tracer().start_span("test-span", telemetry::SpanKind::Internal); + (void)span; + } + const auto& spans1 = exporter->finished_spans(); + assert(spans1.size() == 1); + assert(spans1[0].instrumentation_name == telemetry::INSTRUMENTATION_NAME); + + exporter->reset(); + { + FastMCP app("test-server"); + tools::Tool tool( + "echo", + Json{{"type", "object"}, + {"properties", Json{{"message", Json{{"type", "string"}}}}}}, + Json::object(), + [](const Json& input) { return Json{{"message", input.value("message", "")}}; }); + app.tools().register_tool(tool); + + auto handler = mcp::make_mcp_handler(app); + client::Client client(std::make_unique(handler, "sess-123")); + client.call_tool("echo", Json{{"message", "hi"}}, std::nullopt, + std::chrono::milliseconds{0}, nullptr, false); + } + + auto spans2 = exporter->finished_spans(); + telemetry::Span* client_span = find_span(spans2, "tool echo", false); + telemetry::Span* server_span = find_span(spans2, "tool echo", true); + assert(client_span != nullptr); + assert(server_span != nullptr); + assert(client_span->kind == telemetry::SpanKind::Client); + assert(server_span->kind == telemetry::SpanKind::Server); + assert(client_span->context.trace_id == server_span->context.trace_id); + assert(server_span->parent.has_value()); + assert(server_span->parent->span_id == client_span->context.span_id); + assert(client_span->attributes.count("fastmcp.session.id") == 1); + + exporter->reset(); + { + FastMCP app("fail-server"); + tools::Tool tool("boom", Json{{"type", "object"}}, Json::object(), + [](const Json&) -> Json { throw std::runtime_error("boom"); }); + app.tools().register_tool(tool); + + auto handler = mcp::make_mcp_handler(app); + client::Client client(std::make_unique(handler, "sess-999")); + bool threw = false; + try + { + client.call_tool("boom", Json::object()); + } + catch (const std::exception&) + { + threw = true; + } + assert(threw); + } + + auto spans3 = exporter->finished_spans(); + telemetry::Span* server_error_span = find_span(spans3, "tool boom", true); + assert(server_error_span != nullptr); + assert(server_error_span->status == telemetry::StatusCode::Error); + + telemetry::set_span_exporter(nullptr); + std::cout << "fastmcpp_telemetry: PASS\n"; + return 0; +} From 0f7a309d067c3e09e54d03a193c62409fab9024f Mon Sep 17 00:00:00 2001 From: Elias Bachaalany Date: Sun, 18 Jan 2026 19:03:51 -0800 Subject: [PATCH 3/4] Format C++ sources Apply clang-format to keep CI formatting check green. --- examples/stdio_mcp_server.cpp | 23 +- .../fastmcpp/server/middleware_pipeline.hpp | 296 +++++++++--------- include/fastmcpp/tools/tool.hpp | 38 +-- src/cli/main.cpp | 3 +- src/client/transports.cpp | 20 +- src/mcp/handler.cpp | 18 +- src/server/context.cpp | 3 +- src/telemetry.cpp | 23 +- tests/server/test_middleware_pipeline.cpp | 7 +- tests/telemetry/tracing.cpp | 3 +- tests/tools/test_tool_timeout.cpp | 45 ++- 11 files changed, 229 insertions(+), 250 deletions(-) diff --git a/examples/stdio_mcp_server.cpp b/examples/stdio_mcp_server.cpp index 255a2da..fe8b7ed 100644 --- a/examples/stdio_mcp_server.cpp +++ b/examples/stdio_mcp_server.cpp @@ -31,26 +31,23 @@ int main() tm.register_tool(add); fastmcpp::tools::Tool counter{ - "counter", - Json{{"type", "object"}, {"properties", Json::object()}}, + "counter", Json{{"type", "object"}, {"properties", Json::object()}}, Json{{"type", "array"}, - {"items", - Json::array({Json{{"type", "object"}, - {"properties", Json{{"type", Json{{"type", "string"}}}, - {"text", Json{{"type", "string"}}}}}, - {"required", Json::array({"type", "text"})}}})}}, + {"items", Json::array({Json{{"type", "object"}, + {"properties", Json{{"type", Json{{"type", "string"}}}, + {"text", Json{{"type", "string"}}}}}, + {"required", Json::array({"type", "text"})}}})}}, [&counter_value](const Json&) -> Json { counter_value += 1; - return Json{{"content", - Json::array({Json{{"type", "text"}, {"text", std::to_string(counter_value)}}})}}; + return Json{{"content", Json::array({Json{{"type", "text"}, + {"text", std::to_string(counter_value)}}})}}; }}; tm.register_tool(counter); - auto handler = - fastmcpp::mcp::make_mcp_handler("demo_stdio", "0.1.0", tm, - {{"add", "Add two numbers"}, - {"counter", "Increment and return an in-process counter"}}); + auto handler = fastmcpp::mcp::make_mcp_handler( + "demo_stdio", "0.1.0", tm, + {{"add", "Add two numbers"}, {"counter", "Increment and return an in-process counter"}}); fastmcpp::server::StdioServerWrapper server(handler); server.run(); return 0; diff --git a/include/fastmcpp/server/middleware_pipeline.hpp b/include/fastmcpp/server/middleware_pipeline.hpp index f17e0ea..ae460ed 100644 --- a/include/fastmcpp/server/middleware_pipeline.hpp +++ b/include/fastmcpp/server/middleware_pipeline.hpp @@ -7,24 +7,24 @@ /// - Middleware base class with virtual hooks /// - Built-in implementations: Logging, Timing, Caching, RateLimiting, ErrorHandling -#include "fastmcpp/server/session.hpp" -#include "fastmcpp/types.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "fastmcpp/server/session.hpp" +#include "fastmcpp/types.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace fastmcpp::server { @@ -39,12 +39,12 @@ struct MiddlewareContext std::string method; ///< MCP method name (e.g., "tools/call") std::string source{"client"}; ///< Origin: "client" or "server" std::string type{"request"}; ///< Message type: "request" or "notification" - std::chrono::steady_clock::time_point timestamp; ///< Request timestamp - std::optional request_id; ///< Request ID if available - std::shared_ptr session; ///< ServerSession for this request (optional) - std::optional tool_name; ///< Tool name for tools/call - std::optional resource_uri; ///< Resource URI for resources/read - std::optional prompt_name; ///< Prompt name for prompts/get + std::chrono::steady_clock::time_point timestamp; ///< Request timestamp + std::optional request_id; ///< Request ID if available + std::shared_ptr session; ///< ServerSession for this request (optional) + std::optional tool_name; ///< Tool name for tools/call + std::optional resource_uri; ///< Resource URI for resources/read + std::optional prompt_name; ///< Prompt name for prompts/get /// Create a copy with modified fields MiddlewareContext copy() const @@ -510,8 +510,8 @@ class RateLimitingMiddleware : public Middleware }; /// Error handling middleware - catches exceptions and converts to MCP errors -class ErrorHandlingMiddleware : public Middleware -{ +class ErrorHandlingMiddleware : public Middleware +{ public: using ErrorCallback = std::function; @@ -578,125 +578,125 @@ class ErrorHandlingMiddleware : public Middleware ErrorCallback callback_; bool include_trace_; mutable std::mutex mutex_; - std::unordered_map error_counts_; -}; - -/// Ping middleware - sends periodic pings to keep client connections alive -class PingMiddleware : public Middleware -{ - public: - explicit PingMiddleware(std::chrono::milliseconds interval = std::chrono::milliseconds(30000)) - : interval_(interval) - { - if (interval_.count() <= 0) - throw std::invalid_argument("interval must be positive"); - } - - explicit PingMiddleware(int interval_ms) - : PingMiddleware(std::chrono::milliseconds(interval_ms)) - { - } - - ~PingMiddleware() override - { - stop(); - } - - Json operator()(const MiddlewareContext& ctx, CallNext call_next) override - { - if (ctx.session) - ensure_session(ctx.session); - return call_next(ctx); - } - - private: - void ensure_session(const std::shared_ptr& session) - { - const std::string key = session_key(session); - if (key.empty()) - return; - - bool should_start = false; - { - std::lock_guard lock(mutex_); - if (active_sessions_.insert(key).second) - should_start = true; - } - - if (!should_start) - return; - - std::weak_ptr weak_session = session; - std::thread worker([this, weak_session, key]() { ping_loop(weak_session, key); }); - { - std::lock_guard lock(mutex_); - threads_.push_back(std::move(worker)); - } - } - - void ping_loop(std::weak_ptr weak_session, const std::string& key) - { - while (true) - { - { - std::unique_lock lock(mutex_); - if (cv_.wait_for(lock, interval_, [this]() { return stop_.load(); })) - break; - } - - if (stop_.load()) - break; - - auto session = weak_session.lock(); - if (!session) - break; - - try - { - session->send_ping(interval_); - } - catch (const std::exception&) - { - break; - } - } - - std::lock_guard lock(mutex_); - active_sessions_.erase(key); - } - - void stop() - { - stop_.store(true); - cv_.notify_all(); - - std::vector threads; - { - std::lock_guard lock(mutex_); - threads.swap(threads_); - } - - for (auto& t : threads) - if (t.joinable()) - t.join(); - } - - static std::string session_key(const std::shared_ptr& session) - { - if (!session) - return {}; - auto key = session->session_id(); - if (!key.empty()) - return key; - return "session@" + std::to_string(reinterpret_cast(session.get())); - } - - std::chrono::milliseconds interval_; - std::mutex mutex_; - std::condition_variable cv_; - std::unordered_set active_sessions_; - std::vector threads_; - std::atomic stop_{false}; -}; - -} // namespace fastmcpp::server + std::unordered_map error_counts_; +}; + +/// Ping middleware - sends periodic pings to keep client connections alive +class PingMiddleware : public Middleware +{ + public: + explicit PingMiddleware(std::chrono::milliseconds interval = std::chrono::milliseconds(30000)) + : interval_(interval) + { + if (interval_.count() <= 0) + throw std::invalid_argument("interval must be positive"); + } + + explicit PingMiddleware(int interval_ms) + : PingMiddleware(std::chrono::milliseconds(interval_ms)) + { + } + + ~PingMiddleware() override + { + stop(); + } + + Json operator()(const MiddlewareContext& ctx, CallNext call_next) override + { + if (ctx.session) + ensure_session(ctx.session); + return call_next(ctx); + } + + private: + void ensure_session(const std::shared_ptr& session) + { + const std::string key = session_key(session); + if (key.empty()) + return; + + bool should_start = false; + { + std::lock_guard lock(mutex_); + if (active_sessions_.insert(key).second) + should_start = true; + } + + if (!should_start) + return; + + std::weak_ptr weak_session = session; + std::thread worker([this, weak_session, key]() { ping_loop(weak_session, key); }); + { + std::lock_guard lock(mutex_); + threads_.push_back(std::move(worker)); + } + } + + void ping_loop(std::weak_ptr weak_session, const std::string& key) + { + while (true) + { + { + std::unique_lock lock(mutex_); + if (cv_.wait_for(lock, interval_, [this]() { return stop_.load(); })) + break; + } + + if (stop_.load()) + break; + + auto session = weak_session.lock(); + if (!session) + break; + + try + { + session->send_ping(interval_); + } + catch (const std::exception&) + { + break; + } + } + + std::lock_guard lock(mutex_); + active_sessions_.erase(key); + } + + void stop() + { + stop_.store(true); + cv_.notify_all(); + + std::vector threads; + { + std::lock_guard lock(mutex_); + threads.swap(threads_); + } + + for (auto& t : threads) + if (t.joinable()) + t.join(); + } + + static std::string session_key(const std::shared_ptr& session) + { + if (!session) + return {}; + auto key = session->session_id(); + if (!key.empty()) + return key; + return "session@" + std::to_string(reinterpret_cast(session.get())); + } + + std::chrono::milliseconds interval_; + std::mutex mutex_; + std::condition_variable cv_; + std::unordered_set active_sessions_; + std::vector threads_; + std::atomic stop_{false}; +}; + +} // namespace fastmcpp::server diff --git a/include/fastmcpp/tools/tool.hpp b/include/fastmcpp/tools/tool.hpp index 89ede9b..c23fb6b 100644 --- a/include/fastmcpp/tools/tool.hpp +++ b/include/fastmcpp/tools/tool.hpp @@ -81,30 +81,30 @@ class Tool auto future = promise.get_future(); auto timeout = *timeout_; - std::thread worker([promise = std::move(promise), input, fn = fn_]() mutable - { - try - { - promise.set_value(fn(input)); - } - catch (...) - { - try - { - promise.set_exception(std::current_exception()); - } - catch (...) - { - } - } - }); + std::thread worker( + [promise = std::move(promise), input, fn = fn_]() mutable + { + try + { + promise.set_value(fn(input)); + } + catch (...) + { + try + { + promise.set_exception(std::current_exception()); + } + catch (...) + { + } + } + }); if (future.wait_for(timeout) == std::future_status::timeout) { if (worker.joinable()) worker.detach(); - throw fastmcpp::ToolTimeoutError("Tool '" + name_ + - "' execution timed out after " + + throw fastmcpp::ToolTimeoutError("Tool '" + name_ + "' execution timed out after " + format_timeout_seconds(timeout) + "s"); } diff --git a/src/cli/main.cpp b/src/cli/main.cpp index d54654f..f9ace13 100644 --- a/src/cli/main.cpp +++ b/src/cli/main.cpp @@ -52,7 +52,8 @@ static int tasks_usage(int exit_code = 1) std::cout << " --ws WebSocket URL (e.g. ws://127.0.0.1:8765)\n"; std::cout << " --stdio Spawn an MCP stdio server\n"; std::cout << " --stdio-arg Repeatable args for --stdio\n"; - std::cout << " --stdio-one-shot Spawn a fresh process per request (disables keep-alive)\n"; + std::cout << " --stdio-one-shot Spawn a fresh process per request (disables " + "keep-alive)\n"; std::cout << "\n"; std::cout << "Notes:\n"; std::cout << " - Python fastmcp's `tasks` CLI is for Docket (distributed workers/Redis).\n"; diff --git a/src/client/transports.cpp b/src/client/transports.cpp index e3094ed..f3c3762 100644 --- a/src/client/transports.cpp +++ b/src/client/transports.cpp @@ -1,12 +1,12 @@ #include "fastmcpp/client/transports.hpp" #include "fastmcpp/exceptions.hpp" -#include "fastmcpp/util/json.hpp" +#include "fastmcpp/util/json.hpp" #include #include #include -#include +#include #include #include #include @@ -19,8 +19,8 @@ #include #endif -namespace fastmcpp::client -{ +namespace fastmcpp::client +{ struct StdioTransport::State { @@ -38,9 +38,9 @@ struct StdioTransport::State #endif }; -namespace -{ -struct ParsedUrl +namespace +{ +struct ParsedUrl { std::string scheme; // "http" or "https" std::string host; @@ -634,10 +634,8 @@ fastmcpp::Json StdioTransport::request(const std::string& route, const fastmcpp: { std::lock_guard lock(st->mutex); throw fastmcpp::TransportError( - "StdioTransport process exited with code: " + - std::to_string(exit_status) + - (st->stderr_data.empty() ? std::string("") - : ("; stderr: ") + st->stderr_data)); + "StdioTransport process exited with code: " + std::to_string(exit_status) + + (st->stderr_data.empty() ? std::string("") : ("; stderr: ") + st->stderr_data)); } std::unique_lock lock(st->mutex); diff --git a/src/mcp/handler.cpp b/src/mcp/handler.cpp index d504690..0b7ec0d 100644 --- a/src/mcp/handler.cpp +++ b/src/mcp/handler.cpp @@ -858,12 +858,10 @@ make_mcp_handler(const std::string& server_name, const std::string& version, fastmcpp::Json args = params.value("arguments", fastmcpp::Json::object()); if (name.empty()) return jsonrpc_error(id, -32602, "Missing tool name"); - auto span = - telemetry::server_span("tool " + name, "tools/call", server_name, "tool", - name, extract_request_meta(params), - session_id.empty() ? std::nullopt - : std::optional( - session_id)); + auto span = telemetry::server_span( + "tool " + name, "tools/call", server_name, "tool", name, + extract_request_meta(params), + session_id.empty() ? std::nullopt : std::optional(session_id)); try { const auto& tool = tools.get(name); @@ -1099,8 +1097,8 @@ std::function make_mcp_handler( { std::string prompt_name = params.value("name", ""); auto span = telemetry::server_span( - "prompt " + prompt_name, "prompts/get", server.name(), "prompt", - prompt_name, extract_request_meta(params), + "prompt " + prompt_name, "prompts/get", server.name(), "prompt", prompt_name, + extract_request_meta(params), session_id.empty() ? std::nullopt : std::optional(session_id)); try { @@ -1300,8 +1298,8 @@ make_mcp_handler(const std::string& server_name, const std::string& version, { std::string prompt_name = params.value("name", ""); auto span = telemetry::server_span( - "prompt " + prompt_name, "prompts/get", server.name(), "prompt", - prompt_name, extract_request_meta(params), + "prompt " + prompt_name, "prompts/get", server.name(), "prompt", prompt_name, + extract_request_meta(params), session_id.empty() ? std::nullopt : std::optional(session_id)); try { diff --git a/src/server/context.cpp b/src/server/context.cpp index d7d2bd3..5a18e23 100644 --- a/src/server/context.cpp +++ b/src/server/context.cpp @@ -17,8 +17,7 @@ Context::Context(const resources::ResourceManager& rm, const prompts::PromptMana Context::Context(const resources::ResourceManager& rm, const prompts::PromptManager& pm, std::optional request_meta, std::optional request_id, - std::optional session_id, - std::optional transport) + std::optional session_id, std::optional transport) : resource_mgr_(&rm), prompt_mgr_(&pm), request_meta_(std::move(request_meta)), request_id_(std::move(request_id)), session_id_(std::move(session_id)), transport_(std::move(transport)) diff --git a/src/telemetry.cpp b/src/telemetry.cpp index d946527..d4d7de9 100644 --- a/src/telemetry.cpp +++ b/src/telemetry.cpp @@ -292,9 +292,7 @@ SpanScope client_span(const std::string& name, const std::string& method, return span; span.span().set_attributes( - {{"rpc.system", "mcp"}, - {"rpc.method", method}, - {"fastmcp.component.key", component_key}}); + {{"rpc.system", "mcp"}, {"rpc.method", method}, {"fastmcp.component.key", component_key}}); if (session_id && !session_id->empty()) span.span().set_attribute("fastmcp.session.id", *session_id); return span; @@ -307,19 +305,18 @@ SpanScope server_span(const std::string& name, const std::string& method, const std::optional& session_id) { auto parent_ctx = extract_trace_context(request_meta); - auto span = get_tracer().start_span(name, SpanKind::Server, - parent_ctx.is_valid() ? std::optional(parent_ctx) - : std::nullopt); + auto span = get_tracer().start_span( + name, SpanKind::Server, + parent_ctx.is_valid() ? std::optional(parent_ctx) : std::nullopt); if (!span.active()) return span; - span.span().set_attributes( - {{"rpc.system", "mcp"}, - {"rpc.service", server_name}, - {"rpc.method", method}, - {"fastmcp.server.name", server_name}, - {"fastmcp.component.type", component_type}, - {"fastmcp.component.key", component_key}}); + span.span().set_attributes({{"rpc.system", "mcp"}, + {"rpc.service", server_name}, + {"rpc.method", method}, + {"fastmcp.server.name", server_name}, + {"fastmcp.component.type", component_type}, + {"fastmcp.component.key", component_key}}); if (session_id && !session_id->empty()) span.span().set_attribute("fastmcp.session.id", *session_id); return span; diff --git a/tests/server/test_middleware_pipeline.cpp b/tests/server/test_middleware_pipeline.cpp index 4255b92..2b6dd8d 100644 --- a/tests/server/test_middleware_pipeline.cpp +++ b/tests/server/test_middleware_pipeline.cpp @@ -3,8 +3,8 @@ #include "fastmcpp/server/middleware_pipeline.hpp" -#include #include +#include #include #include #include @@ -280,9 +280,8 @@ void test_ping_middleware() if (ServerSession::is_request(msg) && msg.value("method", "") == "ping") { ping_count.fetch_add(1); - Json response = {{"jsonrpc", "2.0"}, - {"id", msg.at("id")}, - {"result", Json::object()}}; + Json response = { + {"jsonrpc", "2.0"}, {"id", msg.at("id")}, {"result", Json::object()}}; session->handle_response(response); } }); diff --git a/tests/telemetry/tracing.cpp b/tests/telemetry/tracing.cpp index e81408c..b2c98ff 100644 --- a/tests/telemetry/tracing.cpp +++ b/tests/telemetry/tracing.cpp @@ -87,8 +87,7 @@ int main() FastMCP app("test-server"); tools::Tool tool( "echo", - Json{{"type", "object"}, - {"properties", Json{{"message", Json{{"type", "string"}}}}}}, + Json{{"type", "object"}, {"properties", Json{{"message", Json{{"type", "string"}}}}}}, Json::object(), [](const Json& input) { return Json{{"message", input.value("message", "")}}; }); app.tools().register_tool(tool); diff --git a/tests/tools/test_tool_timeout.cpp b/tests/tools/test_tool_timeout.cpp index 15e3708..bcfddde 100644 --- a/tests/tools/test_tool_timeout.cpp +++ b/tests/tools/test_tool_timeout.cpp @@ -17,15 +17,12 @@ void test_tool_timeout_triggers() { std::cout << " test_tool_timeout_triggers... " << std::flush; - Tool slow_tool( - "slow", - Json::object(), - Json::object(), - [](const Json&) -> Json - { - std::this_thread::sleep_for(50ms); - return Json{{"ok", true}}; - }); + Tool slow_tool("slow", Json::object(), Json::object(), + [](const Json&) -> Json + { + std::this_thread::sleep_for(50ms); + return Json{{"ok", true}}; + }); slow_tool.set_timeout(10ms); @@ -47,15 +44,12 @@ void test_tool_timeout_disabled() { std::cout << " test_tool_timeout_disabled... " << std::flush; - Tool slow_tool( - "slow_no_timeout", - Json::object(), - Json::object(), - [](const Json&) -> Json - { - std::this_thread::sleep_for(30ms); - return Json{{"ok", true}}; - }); + Tool slow_tool("slow_no_timeout", Json::object(), Json::object(), + [](const Json&) -> Json + { + std::this_thread::sleep_for(30ms); + return Json{{"ok", true}}; + }); slow_tool.set_timeout(5ms); @@ -68,15 +62,12 @@ void test_manager_timeout_toggle() { std::cout << " test_manager_timeout_toggle... " << std::flush; - Tool slow_tool( - "slow_manager", - Json::object(), - Json::object(), - [](const Json&) -> Json - { - std::this_thread::sleep_for(40ms); - return Json{{"ok", true}}; - }); + Tool slow_tool("slow_manager", Json::object(), Json::object(), + [](const Json&) -> Json + { + std::this_thread::sleep_for(40ms); + return Json{{"ok", true}}; + }); slow_tool.set_timeout(10ms); From 972e62ca90671ea7206d81a5ddfb07580d39a74b Mon Sep 17 00:00:00 2001 From: Elias Bachaalany Date: Sun, 18 Jan 2026 19:28:39 -0800 Subject: [PATCH 4/4] Stabilize timing-sensitive tests --- tests/server/test_middleware_pipeline.cpp | 12 +++++++++--- tests/tools/test_tool_timeout.cpp | 13 ++++++++++--- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/tests/server/test_middleware_pipeline.cpp b/tests/server/test_middleware_pipeline.cpp index 2b6dd8d..b64dc26 100644 --- a/tests/server/test_middleware_pipeline.cpp +++ b/tests/server/test_middleware_pipeline.cpp @@ -6,7 +6,9 @@ #include #include #include +#include #include +#include #include #include @@ -272,6 +274,8 @@ void test_ping_middleware() std::shared_ptr session; std::atomic ping_count{0}; + std::condition_variable ping_cv; + std::mutex ping_mutex; session = std::make_shared( "session_ping", @@ -280,6 +284,7 @@ void test_ping_middleware() if (ServerSession::is_request(msg) && msg.value("method", "") == "ping") { ping_count.fetch_add(1); + ping_cv.notify_one(); Json response = { {"jsonrpc", "2.0"}, {"id", msg.at("id")}, {"result", Json::object()}}; session->handle_response(response); @@ -295,9 +300,10 @@ void test_ping_middleware() pipeline.execute(ctx, [](const MiddlewareContext&) { return Json{{"tools", Json::array()}}; }); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - assert(ping_count.load() > 0); + std::unique_lock lock(ping_mutex); + bool pinged = ping_cv.wait_for(lock, std::chrono::milliseconds(500), + [&]() { return ping_count.load() > 0; }); + assert(pinged); std::cout << "PASSED\n"; } diff --git a/tests/tools/test_tool_timeout.cpp b/tests/tools/test_tool_timeout.cpp index bcfddde..517892c 100644 --- a/tests/tools/test_tool_timeout.cpp +++ b/tests/tools/test_tool_timeout.cpp @@ -13,6 +13,13 @@ using namespace fastmcpp; using namespace fastmcpp::tools; using namespace std::chrono_literals; +static void sleep_for_at_least(std::chrono::milliseconds duration) +{ + auto deadline = std::chrono::steady_clock::now() + duration; + while (std::chrono::steady_clock::now() < deadline) + std::this_thread::sleep_for(1ms); +} + void test_tool_timeout_triggers() { std::cout << " test_tool_timeout_triggers... " << std::flush; @@ -20,7 +27,7 @@ void test_tool_timeout_triggers() Tool slow_tool("slow", Json::object(), Json::object(), [](const Json&) -> Json { - std::this_thread::sleep_for(50ms); + sleep_for_at_least(50ms); return Json{{"ok", true}}; }); @@ -47,7 +54,7 @@ void test_tool_timeout_disabled() Tool slow_tool("slow_no_timeout", Json::object(), Json::object(), [](const Json&) -> Json { - std::this_thread::sleep_for(30ms); + sleep_for_at_least(30ms); return Json{{"ok", true}}; }); @@ -65,7 +72,7 @@ void test_manager_timeout_toggle() Tool slow_tool("slow_manager", Json::object(), Json::object(), [](const Json&) -> Json { - std::this_thread::sleep_for(40ms); + sleep_for_at_least(40ms); return Json{{"ok", true}}; });