Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.16)
if(POLICY CMP0169)
cmake_policy(SET CMP0169 OLD)
endif()
project(fastmcpp VERSION 3.1.0 LANGUAGES CXX)
project(fastmcpp VERSION 3.1.1 LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
Expand Down Expand Up @@ -284,6 +284,10 @@ if(FASTMCPP_BUILD_TESTS)
target_link_libraries(fastmcpp_settings PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_settings COMMAND fastmcpp_settings)

add_executable(fastmcpp_util_metadata_parsing tests/util/metadata_parsing.cpp)
target_link_libraries(fastmcpp_util_metadata_parsing PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_util_metadata_parsing COMMAND fastmcpp_util_metadata_parsing)

add_executable(fastmcpp_stdio_server tests/transports/stdio_server.cpp)
target_link_libraries(fastmcpp_stdio_server PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_stdio_server COMMAND fastmcpp_stdio_server)
Expand Down Expand Up @@ -318,6 +322,12 @@ if(FASTMCPP_BUILD_TESTS)
target_link_libraries(fastmcpp_resources_templates PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_resources_templates COMMAND fastmcpp_resources_templates)

add_executable(fastmcpp_resources_template_query_params
tests/resources/template_query_params.cpp)
target_link_libraries(fastmcpp_resources_template_query_params PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_resources_template_query_params
COMMAND fastmcpp_resources_template_query_params)

add_executable(fastmcpp_server_basic tests/server/basic.cpp)
target_link_libraries(fastmcpp_server_basic PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_server_basic COMMAND fastmcpp_server_basic)
Expand Down Expand Up @@ -490,6 +500,15 @@ if(FASTMCPP_BUILD_TESTS)
set_tests_properties(fastmcpp_stdio_timeout PROPERTIES TIMEOUT 60)

# App mounting tests
add_executable(fastmcpp_app_mount_query_params tests/app/mount_query_params.cpp)
target_link_libraries(fastmcpp_app_mount_query_params PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_app_mount_query_params COMMAND fastmcpp_app_mount_query_params)

add_executable(fastmcpp_app_custom_route_forwarding
tests/app/custom_route_forwarding.cpp)
target_link_libraries(fastmcpp_app_custom_route_forwarding PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_app_custom_route_forwarding COMMAND fastmcpp_app_custom_route_forwarding)

add_executable(fastmcpp_app_mounting tests/app/mounting.cpp)
target_link_libraries(fastmcpp_app_mounting PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_app_mounting COMMAND fastmcpp_app_mounting)
Expand Down Expand Up @@ -528,6 +547,10 @@ if(FASTMCPP_BUILD_TESTS)
target_link_libraries(fastmcpp_provider_version_filter PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_provider_version_filter COMMAND fastmcpp_provider_version_filter)

add_executable(fastmcpp_provider_catalog_dedup tests/providers/catalog_dedup.cpp)
target_link_libraries(fastmcpp_provider_catalog_dedup PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_provider_catalog_dedup COMMAND fastmcpp_provider_catalog_dedup)

add_executable(fastmcpp_provider_catalog_search tests/providers/test_catalog_search_transforms.cpp)
target_link_libraries(fastmcpp_provider_catalog_search PRIVATE fastmcpp_core)
add_test(NAME fastmcpp_provider_catalog_search COMMAND fastmcpp_provider_catalog_search)
Expand Down
170 changes: 100 additions & 70 deletions examples/streaming_demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <chrono>
#include <httplib.h>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
Expand All @@ -15,11 +16,19 @@ using fastmcpp::server::SseServerWrapper;

int main()
{
auto handler = [](const Json& request) -> Json { return request; };
// Bind to any available port and start wrapper
// Echo handler: returns a minimal JSON-RPC response carrying the posted value.
auto handler = [](const Json& request) -> Json
{
Json response = {{"jsonrpc", "2.0"},
{"id", request.value("id", Json(nullptr))},
{"result", request.value("params", Json::object())}};
return response;
};

// Choose port with fallback range
int port = -1;
std::unique_ptr<SseServerWrapper> server;
for (int candidate = 18111; candidate <= 18131; ++candidate)
for (int candidate = 18110; candidate <= 18130; ++candidate)
{
auto trial = std::make_unique<SseServerWrapper>(handler, "127.0.0.1", candidate, "/sse",
"/messages");
Expand All @@ -32,70 +41,85 @@ int main()
}
if (port < 0 || !server)
{
std::cerr << "Failed to start SSE server" << std::endl;
std::cerr << "Failed to start SSE server on candidates" << std::endl;
return 1;
}

std::this_thread::sleep_for(std::chrono::milliseconds(1000));

// Skip strict probe; receiver will retry until connected
// Do not hard-fail on probe; the receiver thread retries connections

std::vector<int> seen;
std::mutex m;
// Start SSE receiver
std::atomic<bool> sse_connected{false};
std::string session_id;
std::atomic<bool> have_endpoint{false};
std::string message_endpoint;
std::vector<int> seen;
std::mutex seen_mutex;
std::mutex endpoint_mutex;

httplib::Client sse_client("127.0.0.1", port);
sse_client.set_connection_timeout(std::chrono::seconds(10));
sse_client.set_read_timeout(std::chrono::seconds(20));

// NOTE: httplib::Client must be created in the same thread that uses it on Linux
std::thread sse_thread(
[&, port]()
[&]()
{
// Create client inside thread - httplib::Client is not thread-safe across threads on
// Linux
httplib::Client cli("127.0.0.1", port);
cli.set_connection_timeout(std::chrono::seconds(10));
cli.set_read_timeout(std::chrono::seconds(20));

std::string buffer;
auto receiver = [&](const char* data, size_t len)
{
sse_connected = true;
std::string chunk(data, len);

// Parse SSE endpoint event to extract session_id
if (chunk.find("event: endpoint") != std::string::npos)
buffer.append(data, len);

// Process complete SSE blocks separated by a blank line.
// Each block can contain lines like:
// event: endpoint
// data: /messages?session_id=...
// or:
// data: {json}\n\n
while (true)
{
size_t data_pos = chunk.find("data: ");
if (data_pos != std::string::npos)
{
size_t start = data_pos + 6;
size_t end = chunk.find_first_of("\n\r", start);
std::string endpoint_url = chunk.substr(start, end - start);
size_t end = buffer.find("\n\n");
if (end == std::string::npos)
break;

size_t sid_pos = endpoint_url.find("session_id=");
if (sid_pos != std::string::npos)
std::string block = buffer.substr(0, end);
buffer.erase(0, end + 2);

// Extract endpoint path if present
if (block.find("event: endpoint") != std::string::npos)
{
size_t data_pos = block.find("data: ");
if (data_pos != std::string::npos)
{
size_t sid_start = sid_pos + 11;
size_t sid_end = endpoint_url.find_first_of("&\n\r", sid_start);
std::lock_guard<std::mutex> lock(m);
session_id = endpoint_url.substr(sid_start, sid_end - sid_start);
size_t value_start = data_pos + 6;
size_t value_end = block.find('\n', value_start);
std::string endpoint =
block.substr(value_start, value_end == std::string::npos
? std::string::npos
: value_end - value_start);
{
std::lock_guard<std::mutex> lock(endpoint_mutex);
message_endpoint = endpoint;
have_endpoint = !message_endpoint.empty();
}
}
continue;
}
}

if (chunk.find("data: ") == 0)
{
size_t start = 6;
size_t end = chunk.find("\n\n");
if (end != std::string::npos)
// Parse "data: {json}" events and collect result.n values.
if (block.rfind("data: ", 0) == 0)
{
std::string json_str = chunk.substr(start, end - start);
std::string json_str = block.substr(6);
try
{
Json j = Json::parse(json_str);
if (j.contains("n"))
if (j.contains("result") && j["result"].is_object() &&
j["result"].contains("n"))
{
std::lock_guard<std::mutex> lock(m);
seen.push_back(j["n"].get<int>());
std::lock_guard<std::mutex> lock(seen_mutex);
seen.push_back(j["result"]["n"].get<int>());
if (seen.size() >= 3)
return false;
return false; // stop after 3
}
}
catch (...)
Expand All @@ -105,9 +129,9 @@ int main()
}
return true;
};
for (int attempt = 0; attempt < 20 && !sse_connected; ++attempt)
for (int attempt = 0; attempt < 60 && !sse_connected; ++attempt)
{
auto res = cli.Get("/sse", receiver);
auto res = sse_client.Get("/sse", receiver);
if (!res)
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
Expand All @@ -118,6 +142,7 @@ int main()
}
});

// Wait for connection
for (int i = 0; i < 500 && !sse_connected; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (!sse_connected)
Expand All @@ -129,36 +154,29 @@ int main()
return 1;
}

// Wait for session_id to be extracted
for (int i = 0; i < 100; ++i)
{
std::lock_guard<std::mutex> lock(m);
if (!session_id.empty())
break;
// Wait for server to tell us the message endpoint (includes required session_id).
for (int i = 0; i < 500 && !have_endpoint; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

std::string sid;
{
std::lock_guard<std::mutex> lock(m);
sid = session_id;
}

if (sid.empty())
if (!have_endpoint)
{
server->stop();
if (sse_thread.joinable())
sse_thread.join();
std::cerr << "Failed to extract session_id" << std::endl;
std::cerr << "Missing endpoint event" << std::endl;
return 1;
}

// Post three messages
httplib::Client post("127.0.0.1", port);
std::string post_path;
{
std::lock_guard<std::mutex> lock(endpoint_mutex);
post_path = message_endpoint;
}
for (int i = 1; i <= 3; ++i)
{
Json j = Json{{"n", i}};
std::string post_url = "/messages?session_id=" + sid;
auto res = post.Post(post_url, j.dump(), "application/json");
Json j = {{"jsonrpc", "2.0"}, {"id", i}, {"method", "echo"}, {"params", {{"n", i}}}};
auto res = post.Post(post_path, j.dump(), "application/json");
if (!res || res->status != 200)
{
server->stop();
Expand All @@ -169,23 +187,35 @@ int main()
}
}

// Wait briefly for all events
for (int i = 0; i < 200; ++i)
{
std::lock_guard<std::mutex> lock(m);
if (seen.size() >= 3)
break;
{
std::lock_guard<std::mutex> lock(seen_mutex);
if (seen.size() >= 3)
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

server->stop();
if (sse_thread.joinable())
sse_thread.join();

if (seen.size() != 3)
{
std::cerr << "expected 3 events, got " << seen.size() << "\n";
return 1;
std::lock_guard<std::mutex> lock(seen_mutex);
if (seen.size() != 3)
{
std::cerr << "expected 3 events, got " << seen.size() << "\n";
return 1;
}
if (seen[0] != 1 || seen[1] != 2 || seen[2] != 3)
{
std::cerr << "unexpected event sequence\n";
return 1;
}
}

std::cout << "ok\n";
return 0;
}
Loading
Loading