diff --git a/CMakeLists.txt b/CMakeLists.txt index ffcf6ff0..5a80c58b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -139,6 +139,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2") ros2_foxglove_bridge/src/param_utils.cpp ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp ros2_foxglove_bridge/src/parameter_interface.cpp + ros2_foxglove_bridge/src/service_utils.cpp ) target_include_directories(foxglove_bridge_component PUBLIC @@ -203,7 +204,7 @@ elseif(ROS_BUILD_TYPE STREQUAL "ament_cmake") target_link_libraries(serialization_test foxglove_bridge_base) ament_add_gtest(smoke_test ros2_foxglove_bridge/tests/smoke_test.cpp) - ament_target_dependencies(smoke_test rclcpp rclcpp_components std_msgs) + ament_target_dependencies(smoke_test rclcpp rclcpp_components std_msgs std_srvs) target_link_libraries(smoke_test foxglove_bridge_base) endif() endif() diff --git a/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp b/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp index aea155c0..cf8001e5 100644 --- a/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp +++ b/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp @@ -16,6 +16,7 @@ constexpr char PARAM_CERTFILE[] = "certfile"; constexpr char PARAM_KEYFILE[] = "keyfile"; constexpr char PARAM_MAX_QOS_DEPTH[] = "max_qos_depth"; constexpr char PARAM_TOPIC_WHITELIST[] = "topic_whitelist"; +constexpr char PARAM_SERVICE_WHITELIST[] = "service_whitelist"; constexpr char PARAM_PARAMETER_WHITELIST[] = "param_whitelist"; constexpr int64_t DEFAULT_PORT = 8765; diff --git a/ros2_foxglove_bridge/include/foxglove_bridge/service_utils.hpp b/ros2_foxglove_bridge/include/foxglove_bridge/service_utils.hpp new file mode 100644 index 00000000..e9de339e --- /dev/null +++ b/ros2_foxglove_bridge/include/foxglove_bridge/service_utils.hpp @@ -0,0 +1,51 @@ +#pragma once + +#include + +#include +#include +#include + +namespace foxglove_bridge { + +class GenericClient : public rclcpp::ClientBase { +public: + using SharedRequest = std::shared_ptr; + using SharedResponse = std::shared_ptr; + using Promise = std::promise; + using PromiseWithRequest = std::promise>; + using SharedPromise = std::shared_ptr; + using SharedPromiseWithRequest = std::shared_ptr; + using SharedFuture = std::shared_future; + using SharedFutureWithRequest = std::shared_future>; + using CallbackType = std::function; + using CallbackWithRequestType = std::function; + + RCLCPP_SMART_PTR_DEFINITIONS(GenericClient) + + GenericClient(rclcpp::node_interfaces::NodeBaseInterface* node_base, + rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph, + std::string service_name, std::string service_type, + rcl_client_options_t& client_options); + virtual ~GenericClient() {} + + std::shared_ptr create_response() override; + std::shared_ptr create_request_header() override; + void handle_response(std::shared_ptr request_header, + std::shared_ptr response) override; + SharedFuture async_send_request(SharedRequest request); + SharedFuture async_send_request(SharedRequest request, CallbackType&& cb); + +private: + RCLCPP_DISABLE_COPY(GenericClient) + + std::map> pending_requests_; + std::mutex pending_requests_mutex_; + std::shared_ptr srv_ts_lib_; + const rosidl_service_type_support_t* srv_ts_hdl_; + std::shared_ptr req_ts_lib_; + std::shared_ptr res_ts_lib_; + const rosidl_message_type_support_t* res_ts_hdl_; +}; + +} // namespace foxglove_bridge diff --git a/ros2_foxglove_bridge/src/param_utils.cpp b/ros2_foxglove_bridge/src/param_utils.cpp index 75b110f4..99e006d3 100644 --- a/ros2_foxglove_bridge/src/param_utils.cpp +++ b/ros2_foxglove_bridge/src/param_utils.cpp @@ -80,6 +80,15 @@ void declareParameters(rclcpp::Node* node) { node->declare_parameter(PARAM_TOPIC_WHITELIST, std::vector({".*"}), topicWhiteListDescription); + auto serviceWhiteListDescription = rcl_interfaces::msg::ParameterDescriptor{}; + serviceWhiteListDescription.name = PARAM_SERVICE_WHITELIST; + serviceWhiteListDescription.type = rcl_interfaces::msg::ParameterType::PARAMETER_STRING_ARRAY; + serviceWhiteListDescription.description = + "List of regular expressions (ECMAScript) of whitelisted service names."; + serviceWhiteListDescription.read_only = true; + node->declare_parameter(PARAM_SERVICE_WHITELIST, std::vector({".*"}), + serviceWhiteListDescription); + auto paramWhiteListDescription = rcl_interfaces::msg::ParameterDescriptor{}; paramWhiteListDescription.name = PARAM_PARAMETER_WHITELIST; paramWhiteListDescription.type = rcl_interfaces::msg::ParameterType::PARAMETER_STRING_ARRAY; diff --git a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp index d34e40d3..f2ee18cf 100644 --- a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp +++ b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include using namespace std::chrono_literals; @@ -47,6 +48,8 @@ class FoxgloveBridge : public rclcpp::Node { _maxQosDepth = static_cast(this->get_parameter(PARAM_MAX_QOS_DEPTH).as_int()); const auto topicWhiteList = this->get_parameter(PARAM_TOPIC_WHITELIST).as_string_array(); _topicWhitelistPatterns = parseRegexStrings(this, topicWhiteList); + const auto serviceWhiteList = this->get_parameter(PARAM_SERVICE_WHITELIST).as_string_array(); + _serviceWhitelistPatterns = parseRegexStrings(this, serviceWhiteList); const auto paramWhiteList = this->get_parameter(PARAM_PARAMETER_WHITELIST).as_string_array(); const auto paramWhitelistPatterns = parseRegexStrings(this, paramWhiteList); _useSimTime = this->get_parameter("use_sim_time").as_bool(); @@ -58,6 +61,7 @@ class FoxgloveBridge : public rclcpp::Node { foxglove::CAPABILITY_CLIENT_PUBLISH, foxglove::CAPABILITY_PARAMETERS, foxglove::CAPABILITY_PARAMETERS_SUBSCRIBE, + foxglove::CAPABILITY_SERVICES, }; if (_useSimTime) { serverCapabilities.push_back(foxglove::CAPABILITY_TIME); @@ -89,6 +93,8 @@ class FoxgloveBridge : public rclcpp::Node { std::bind(&FoxgloveBridge::parameterChangeHandler, this, _1, _2, _3)); _server->setParameterSubscriptionHandler( std::bind(&FoxgloveBridge::parameterSubscriptionHandler, this, _1, _2, _3)); + _server->setServiceRequestHandler( + std::bind(&FoxgloveBridge::serviceRequestHandler, this, _1, _2)); _paramInterface->setParamUpdateCallback(std::bind(&FoxgloveBridge::parameterUpdates, this, _1)); @@ -109,6 +115,7 @@ class FoxgloveBridge : public rclcpp::Node { _subscriptionCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant); _clientPublishCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); + _servicesCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant); if (_useSimTime) { _clockSubscription = this->create_subscription( @@ -132,6 +139,7 @@ class FoxgloveBridge : public rclcpp::Node { void rosgraphPollThread() { updateAdvertisedTopics(); + updateAdvertisedServices(); auto graphEvent = this->get_graph_event(); while (rclcpp::ok()) { @@ -140,6 +148,7 @@ class FoxgloveBridge : public rclcpp::Node { if (triggered) { RCLCPP_DEBUG(this->get_logger(), "rosgraph change detected"); updateAdvertisedTopics(); + updateAdvertisedServices(); // Graph changes tend to come in batches, so wait a bit before checking again std::this_thread::sleep_for(500ms); } @@ -265,6 +274,97 @@ class FoxgloveBridge : public rclcpp::Node { } } + void updateAdvertisedServices() { + if (!rclcpp::ok()) { + return; + } + + // Get the current list of visible services and datatypes from the ROS graph + const auto serviceNamesAndTypes = + this->get_node_graph_interface()->get_service_names_and_types(); + + std::lock_guard lock(_servicesMutex); + + // Remove advertisements for services that have been removed + std::vector servicesToRemove; + for (const auto& service : _advertisedServices) { + const auto it = std::find_if(serviceNamesAndTypes.begin(), serviceNamesAndTypes.end(), + [service](const auto& serviceNameAndTypes) { + return serviceNameAndTypes.first == service.second.name; + }); + if (it == serviceNamesAndTypes.end()) { + servicesToRemove.push_back(service.first); + } + } + for (auto serviceId : servicesToRemove) { + _advertisedServices.erase(serviceId); + } + _server->removeServices(servicesToRemove); + + // Advertise new services + std::vector newServices; + for (const auto& serviceNamesAndType : serviceNamesAndTypes) { + const auto& serviceName = serviceNamesAndType.first; + const auto& datatypes = serviceNamesAndType.second; + + // Ignore the service if it's already advertised + if (std::find_if(_advertisedServices.begin(), _advertisedServices.end(), + [serviceName](const auto& idWithService) { + return idWithService.second.name == serviceName; + }) != _advertisedServices.end()) { + continue; + } + + // Ignore the service if it is not on the service whitelist + if (std::find_if(_serviceWhitelistPatterns.begin(), _serviceWhitelistPatterns.end(), + [&serviceName](const auto& regex) { + return std::regex_match(serviceName, regex); + }) == _serviceWhitelistPatterns.end()) { + continue; + } + + foxglove::ServiceWithoutId service; + service.name = serviceName; + service.type = datatypes.front(); + + try { + auto [format, reqSchema] = _messageDefinitionCache.get_full_text(service.type + "_Request"); + auto resSchema = _messageDefinitionCache.get_full_text(service.type + "_Response").second; + switch (format) { + case foxglove::MessageDefinitionFormat::MSG: + service.requestSchema = reqSchema; + service.responseSchema = resSchema; + break; + case foxglove::MessageDefinitionFormat::IDL: + RCLCPP_WARN(this->get_logger(), + "IDL message definition format cannot be communicated over ws-protocol. " + "Service \"%s\" (%s) may not decode correctly in clients", + service.name.c_str(), service.type.c_str()); + service.requestSchema = reqSchema; + service.responseSchema = resSchema; + break; + } + } catch (const foxglove::DefinitionNotFoundError& err) { + RCLCPP_WARN(this->get_logger(), "Could not find definition for type %s: %s", + service.type.c_str(), err.what()); + // We still advertise the service, but with an emtpy schema + service.requestSchema = ""; + service.responseSchema = ""; + } catch (const std::exception& err) { + RCLCPP_WARN(this->get_logger(), "Failed to add service \"%s\" (%s): %s", + service.name.c_str(), service.type.c_str(), err.what()); + continue; + } + + newServices.push_back(service); + } + + const auto serviceIds = _server->addServices(newServices); + for (size_t i = 0; i < serviceIds.size(); ++i) { + _advertisedServices.emplace(serviceIds[i], newServices[i]); + } + } + private: struct PairHash { template @@ -276,15 +376,20 @@ class FoxgloveBridge : public rclcpp::Node { std::unique_ptr _server; foxglove::MessageDefinitionCache _messageDefinitionCache; std::vector _topicWhitelistPatterns; + std::vector _serviceWhitelistPatterns; std::shared_ptr _paramInterface; std::unordered_map _advertisedTopics; + std::unordered_map _advertisedServices; std::unordered_map _channelToTopicAndDatatype; std::unordered_map _subscriptions; PublicationsByClient _clientAdvertisedTopics; + std::unordered_map _serviceClients; rclcpp::CallbackGroup::SharedPtr _subscriptionCallbackGroup; rclcpp::CallbackGroup::SharedPtr _clientPublishCallbackGroup; + rclcpp::CallbackGroup::SharedPtr _servicesCallbackGroup; std::mutex _subscriptionsMutex; std::mutex _clientAdvertisementsMutex; + std::mutex _servicesMutex; std::unique_ptr _rosgraphPollThread; size_t _maxQosDepth = DEFAULT_MAX_QOS_DEPTH; std::shared_ptr> _clockSubscription; @@ -621,6 +726,55 @@ class FoxgloveBridge : public rclcpp::Node { msg->get_rcl_serialized_message().buffer_length}; _server->sendMessage(clientHandle, channel.id, static_cast(timestamp), payload); } + + void serviceRequestHandler(const foxglove::ServiceRequest& request, + foxglove::ConnHandle clientHandle) { + RCLCPP_DEBUG(this->get_logger(), "Received a request for service %d", request.serviceId); + + std::lock_guard lock(_servicesMutex); + const auto serviceIt = _advertisedServices.find(request.serviceId); + if (serviceIt == _advertisedServices.end()) { + RCLCPP_ERROR(this->get_logger(), "Service with id '%d' does not exist", request.serviceId); + return; + } + + auto clientIt = _serviceClients.find(request.serviceId); + if (clientIt == _serviceClients.end()) { + auto clientOptions = rcl_client_get_default_options(); + auto genClient = GenericClient::make_shared( + this->get_node_base_interface().get(), this->get_node_graph_interface(), + serviceIt->second.name, serviceIt->second.type, clientOptions); + clientIt = _serviceClients.emplace(request.serviceId, std::move(genClient)).first; + this->get_node_services_interface()->add_client(clientIt->second, _servicesCallbackGroup); + } + + auto client = clientIt->second; + if (!client->wait_for_service(1s)) { + RCLCPP_ERROR(get_logger(), "Service %d (%s) is not available", request.serviceId, + serviceIt->second.name.c_str()); + return; + } + + auto reqMessage = std::make_shared(request.data.size()); + auto& rclSerializedMsg = reqMessage->get_rcl_serialized_message(); + std::memcpy(rclSerializedMsg.buffer, request.data.data(), request.data.size()); + rclSerializedMsg.buffer_length = request.data.size(); + + auto response_received_callback = [this, request, + clientHandle](GenericClient::SharedFuture future) { + const auto serializedResponseMsg = future.get()->get_rcl_serialized_message(); + + foxglove::ServiceRequest response; + response.serviceId = request.serviceId; + response.callId = request.callId; + response.encoding = request.encoding; + response.data.resize(serializedResponseMsg.buffer_length); + std::memcpy(response.data.data(), serializedResponseMsg.buffer, + serializedResponseMsg.buffer_length); + _server->sendServiceResponse(clientHandle, response); + }; + client->async_send_request(reqMessage, response_received_callback); + } }; } // namespace foxglove_bridge diff --git a/ros2_foxglove_bridge/src/service_utils.cpp b/ros2_foxglove_bridge/src/service_utils.cpp new file mode 100644 index 00000000..b3078fcc --- /dev/null +++ b/ros2_foxglove_bridge/src/service_utils.cpp @@ -0,0 +1,177 @@ +#include + +#include +#include +#include +#include +#include + +#include + +namespace { +static std::tuple extract_type_identifier( + const std::string& full_type) { + char type_separator = '/'; + auto sep_position_back = full_type.find_last_of(type_separator); + auto sep_position_front = full_type.find_first_of(type_separator); + if (sep_position_back == std::string::npos || sep_position_back == 0 || + sep_position_back == full_type.length() - 1) { + throw std::runtime_error( + "Message type is not of the form package/type and cannot be processed"); + } + + std::string package_name = full_type.substr(0, sep_position_front); + std::string middle_module = ""; + if (sep_position_back - sep_position_front > 0) { + middle_module = + full_type.substr(sep_position_front + 1, sep_position_back - sep_position_front - 1); + } + std::string type_name = full_type.substr(sep_position_back + 1); + + return std::make_tuple(package_name, middle_module, type_name); +} +} // namespace + +namespace foxglove_bridge { + +constexpr size_t MSG_MEM_BLOCK_SIZE = 1024; +constexpr char LIB_NAME[] = "rosidl_typesupport_introspection_cpp"; +using rosidl_typesupport_introspection_cpp::MessageMembers; +using rosidl_typesupport_introspection_cpp::ServiceMembers; + +std::string get_type_from_message_members(const MessageMembers* members) { + std::string s = members->message_namespace_; + s.replace(s.find("::"), sizeof("::") - 1, "/"); + return s + "/" + members->message_name_; +} + +std::shared_ptr allocate_message(const MessageMembers* members) { + std::string str; + size_t str_capacity = str.capacity(); + for (size_t i = 0; i < str_capacity + 1; i++) { + str += "a"; + } + + std::wstring wstr; + size_t wstr_capacity = wstr.capacity(); + for (size_t i = 0; i < wstr_capacity + 1; i++) { + wstr += L"a"; + } + + uint8_t* buf = static_cast(malloc(members->size_of_ + MSG_MEM_BLOCK_SIZE)); + memset(buf, 0, MSG_MEM_BLOCK_SIZE); + + for (uint32_t i = 0; i < members->member_count_; ++i) { + const auto member = members->members_ + i; + + if (member->type_id_ == rosidl_typesupport_introspection_cpp::ROS_TYPE_STRING) { + memcpy(buf + member->offset_, new std::string(str), sizeof(std::string)); + } + if (member->type_id_ == rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING) { + memcpy(buf + member->offset_, new std::wstring(wstr), sizeof(std::wstring)); + } + } + + return std::shared_ptr(buf); +} + +GenericClient::GenericClient(rclcpp::node_interfaces::NodeBaseInterface* node_base, + rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph, + std::string service_name, std::string service_type, + rcl_client_options_t& client_options) + : rclcpp::ClientBase(node_base, node_graph) { + std::string package_name; + std::string middle_module; + std::string type_name; + std::tie(package_name, middle_module, type_name) = extract_type_identifier(service_type); + + srv_ts_lib_ = rclcpp::get_typesupport_library(service_type, LIB_NAME); + + const std::string symbol_name = + std::string(LIB_NAME) + "__get_service_type_support_handle__" + package_name + "__" + + (middle_module.empty() ? "srv" : middle_module) + "__" + type_name; + const rosidl_service_type_support_t* (*get_ts)() = nullptr; + // This will throw runtime_error if the symbol was not found. + srv_ts_hdl_ = (reinterpret_cast(srv_ts_lib_->get_symbol(symbol_name)))(); + + auto srv_members = static_cast(srv_ts_hdl_->data); + auto response_members = srv_members->response_members_; + auto response_type = get_type_from_message_members(response_members); + res_ts_lib_ = rclcpp::get_typesupport_library(response_type, LIB_NAME); + res_ts_hdl_ = rclcpp::get_typesupport_handle(response_type, LIB_NAME, *res_ts_lib_); + + rcl_ret_t ret = rcl_client_init(this->get_client_handle().get(), this->get_rcl_node_handle(), + srv_ts_hdl_, service_name.c_str(), &client_options); + if (ret != RCL_RET_OK) { + if (ret == RCL_RET_SERVICE_NAME_INVALID) { + auto rcl_node_handle = this->get_rcl_node_handle(); + // this will throw on any validation problem + rcl_reset_error(); + rclcpp::expand_topic_or_service_name(service_name, rcl_node_get_name(rcl_node_handle), + rcl_node_get_namespace(rcl_node_handle), true); + } + rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client"); + } +} + +std::shared_ptr GenericClient::create_response() { + auto srv_members = static_cast(srv_ts_hdl_->data); + return allocate_message(srv_members->response_members_); +} + +std::shared_ptr GenericClient::create_request_header() { + return std::shared_ptr(new rmw_request_id_t); +} + +void GenericClient::handle_response(std::shared_ptr request_header, + std::shared_ptr response) { + std::unique_lock lock(pending_requests_mutex_); + int64_t sequence_number = request_header->sequence_number; + + auto ser_response = std::make_shared(); + rmw_ret_t r = + rmw_serialize(response.get(), res_ts_hdl_, &ser_response->get_rcl_serialized_message()); + if (r != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("foxglove_bridge", "Failed to serialize service response. Ignoring..."); + return; + } + + // TODO(esteve) this should throw instead since it is not expected to happen in the first place + if (this->pending_requests_.count(sequence_number) == 0) { + RCUTILS_LOG_ERROR_NAMED("foxglove_bridge", "Received invalid sequence number. Ignoring..."); + return; + } + auto tuple = this->pending_requests_[sequence_number]; + auto call_promise = std::get<0>(tuple); + auto callback = std::get<1>(tuple); + auto future = std::get<2>(tuple); + this->pending_requests_.erase(sequence_number); + // Unlock here to allow the service to be called recursively from one of its callbacks. + lock.unlock(); + + call_promise->set_value(ser_response); + callback(future); +} + +GenericClient::SharedFuture GenericClient::async_send_request(SharedRequest request) { + return async_send_request(request, [](SharedFuture) {}); +} + +GenericClient::SharedFuture GenericClient::async_send_request(SharedRequest request, + CallbackType&& cb) { + std::lock_guard lock(pending_requests_mutex_); + int64_t sequence_number; + rcl_ret_t ret = rcl_send_request(get_client_handle().get(), + request->get_rcl_serialized_message().buffer, &sequence_number); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request"); + } + + SharedPromise call_promise = std::make_shared(); + SharedFuture f(call_promise->get_future()); + pending_requests_[sequence_number] = + std::make_tuple(call_promise, std::forward(cb), f); + return f; +} + +} // namespace foxglove_bridge diff --git a/ros2_foxglove_bridge/tests/smoke_test.cpp b/ros2_foxglove_bridge/tests/smoke_test.cpp index 3deefb3c..d90cb6a9 100644 --- a/ros2_foxglove_bridge/tests/smoke_test.cpp +++ b/ros2_foxglove_bridge/tests/smoke_test.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -71,6 +72,42 @@ class ParameterTest : public ::testing::Test { std::shared_ptr> _wsClient; }; +class ServiceTest : public ::testing::Test { +public: + inline static const std::string SERVICE_NAME = "/foo_service"; + inline static const std::vector SERIALIZED_RESPONSE = { + 0, 1, 0, 0, 1, 0, 0, 0, 6, 0, 0, 0, 104, 101, 108, 108, 111, 0}; + +protected: + void SetUp() override { + _node = rclcpp::Node::make_shared("node"); + _service = _node->create_service( + SERVICE_NAME, [&](const std::shared_ptr req, + std::shared_ptr res) { + res->message = "hello"; + res->success = true; + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "sending back response. was called with %d", + req->data); + }); + + _executor.add_node(_node); + _executorThread = std::thread([this]() { + _executor.spin(); + }); + } + + void TearDown() override { + _executor.cancel(); + _executorThread.join(); + } + + rclcpp::executors::SingleThreadedExecutor _executor; + rclcpp::Node::SharedPtr _node; + rclcpp::ServiceBase::SharedPtr _service; + std::thread _executorThread; + std::shared_ptr> _wsClient; +}; + TEST(SmokeTest, testConnection) { foxglove::Client wsClient; EXPECT_EQ(std::future_status::ready, wsClient.connect(URI).wait_for(DEFAULT_TIMEOUT)); @@ -301,6 +338,44 @@ TEST_F(ParameterTest, testGetParametersParallel) { } } +TEST_F(ServiceTest, testCallServiceParallel) { + // Connect a few clients (in parallel) and make sure that they can all call the service + auto clients = { + std::make_shared>(), + std::make_shared>(), + std::make_shared>(), + }; + + auto serviceFuture = foxglove::waitForService(*clients.begin(), SERVICE_NAME); + for (auto client : clients) { + ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(std::chrono::seconds(5))); + } + ASSERT_EQ(std::future_status::ready, serviceFuture.wait_for(std::chrono::seconds(5))); + const foxglove::Service service = serviceFuture.get(); + + foxglove::ServiceRequest request; + request.serviceId = service.id; + request.callId = 123lu; + request.encoding = "cdr"; + request.data = {1}; + + std::vector> futures; + for (auto client : clients) { + futures.push_back(foxglove::waitForServiceResponse(client)); + client->sendServiceRequest(request); + } + + for (auto& future : futures) { + ASSERT_EQ(std::future_status::ready, future.wait_for(std::chrono::seconds(5))); + foxglove::ServiceResponse response; + EXPECT_NO_THROW(response = future.get()); + EXPECT_EQ(response.serviceId, request.serviceId); + EXPECT_EQ(response.callId, request.callId); + EXPECT_EQ(response.encoding, request.encoding); + EXPECT_EQ(response.data, SERIALIZED_RESPONSE); + } +} + // Run all the tests that were declared with TEST() int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv);