Skip to content

Commit

Permalink
add ROS2 service support
Browse files Browse the repository at this point in the history
  • Loading branch information
achim-k committed Jan 26, 2023
1 parent a6d3156 commit 289a774
Show file tree
Hide file tree
Showing 7 changed files with 469 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2")
ros2_foxglove_bridge/src/param_utils.cpp
ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp
ros2_foxglove_bridge/src/parameter_interface.cpp
ros2_foxglove_bridge/src/service_utils.cpp
)
target_include_directories(foxglove_bridge_component
PUBLIC
Expand Down Expand Up @@ -203,7 +204,7 @@ elseif(ROS_BUILD_TYPE STREQUAL "ament_cmake")
target_link_libraries(serialization_test foxglove_bridge_base)

ament_add_gtest(smoke_test ros2_foxglove_bridge/tests/smoke_test.cpp)
ament_target_dependencies(smoke_test rclcpp rclcpp_components std_msgs)
ament_target_dependencies(smoke_test rclcpp rclcpp_components std_msgs std_srvs)
target_link_libraries(smoke_test foxglove_bridge_base)
endif()
endif()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ constexpr char PARAM_CERTFILE[] = "certfile";
constexpr char PARAM_KEYFILE[] = "keyfile";
constexpr char PARAM_MAX_QOS_DEPTH[] = "max_qos_depth";
constexpr char PARAM_TOPIC_WHITELIST[] = "topic_whitelist";
constexpr char PARAM_SERVICE_WHITELIST[] = "service_whitelist";
constexpr char PARAM_PARAMETER_WHITELIST[] = "param_whitelist";

constexpr int64_t DEFAULT_PORT = 8765;
Expand Down
51 changes: 51 additions & 0 deletions ros2_foxglove_bridge/include/foxglove_bridge/service_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#pragma once

#include <future>

#include <rclcpp/client.hpp>
#include <rclcpp/serialized_message.hpp>
#include <rcpputils/shared_library.hpp>

namespace foxglove_bridge {

class GenericClient : public rclcpp::ClientBase {
public:
using SharedRequest = std::shared_ptr<rclcpp::SerializedMessage>;
using SharedResponse = std::shared_ptr<rclcpp::SerializedMessage>;
using Promise = std::promise<SharedResponse>;
using PromiseWithRequest = std::promise<std::pair<SharedRequest, SharedResponse>>;
using SharedPromise = std::shared_ptr<Promise>;
using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;
using SharedFuture = std::shared_future<SharedResponse>;
using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;
using CallbackType = std::function<void(SharedFuture)>;
using CallbackWithRequestType = std::function<void(SharedFutureWithRequest)>;

RCLCPP_SMART_PTR_DEFINITIONS(GenericClient)

GenericClient(rclcpp::node_interfaces::NodeBaseInterface* node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
std::string service_name, std::string service_type,
rcl_client_options_t& client_options);
virtual ~GenericClient() {}

std::shared_ptr<void> create_response() override;
std::shared_ptr<rmw_request_id_t> create_request_header() override;
void handle_response(std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> response) override;
SharedFuture async_send_request(SharedRequest request);
SharedFuture async_send_request(SharedRequest request, CallbackType&& cb);

private:
RCLCPP_DISABLE_COPY(GenericClient)

std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
std::mutex pending_requests_mutex_;
std::shared_ptr<rcpputils::SharedLibrary> srv_ts_lib_;
const rosidl_service_type_support_t* srv_ts_hdl_;
std::shared_ptr<rcpputils::SharedLibrary> req_ts_lib_;
std::shared_ptr<rcpputils::SharedLibrary> res_ts_lib_;
const rosidl_message_type_support_t* res_ts_hdl_;
};

} // namespace foxglove_bridge
9 changes: 9 additions & 0 deletions ros2_foxglove_bridge/src/param_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ void declareParameters(rclcpp::Node* node) {
node->declare_parameter(PARAM_TOPIC_WHITELIST, std::vector<std::string>({".*"}),
topicWhiteListDescription);

auto serviceWhiteListDescription = rcl_interfaces::msg::ParameterDescriptor{};
serviceWhiteListDescription.name = PARAM_SERVICE_WHITELIST;
serviceWhiteListDescription.type = rcl_interfaces::msg::ParameterType::PARAMETER_STRING_ARRAY;
serviceWhiteListDescription.description =
"List of regular expressions (ECMAScript) of whitelisted service names.";
serviceWhiteListDescription.read_only = true;
node->declare_parameter(PARAM_SERVICE_WHITELIST, std::vector<std::string>({".*"}),
serviceWhiteListDescription);

auto paramWhiteListDescription = rcl_interfaces::msg::ParameterDescriptor{};
paramWhiteListDescription.name = PARAM_PARAMETER_WHITELIST;
paramWhiteListDescription.type = rcl_interfaces::msg::ParameterType::PARAMETER_STRING_ARRAY;
Expand Down
154 changes: 154 additions & 0 deletions ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <foxglove_bridge/message_definition_cache.hpp>
#include <foxglove_bridge/param_utils.hpp>
#include <foxglove_bridge/parameter_interface.hpp>
#include <foxglove_bridge/service_utils.hpp>
#include <foxglove_bridge/websocket_server.hpp>

using namespace std::chrono_literals;
Expand Down Expand Up @@ -47,6 +48,8 @@ class FoxgloveBridge : public rclcpp::Node {
_maxQosDepth = static_cast<size_t>(this->get_parameter(PARAM_MAX_QOS_DEPTH).as_int());
const auto topicWhiteList = this->get_parameter(PARAM_TOPIC_WHITELIST).as_string_array();
_topicWhitelistPatterns = parseRegexStrings(this, topicWhiteList);
const auto serviceWhiteList = this->get_parameter(PARAM_SERVICE_WHITELIST).as_string_array();
_serviceWhitelistPatterns = parseRegexStrings(this, serviceWhiteList);
const auto paramWhiteList = this->get_parameter(PARAM_PARAMETER_WHITELIST).as_string_array();
const auto paramWhitelistPatterns = parseRegexStrings(this, paramWhiteList);
_useSimTime = this->get_parameter("use_sim_time").as_bool();
Expand All @@ -58,6 +61,7 @@ class FoxgloveBridge : public rclcpp::Node {
foxglove::CAPABILITY_CLIENT_PUBLISH,
foxglove::CAPABILITY_PARAMETERS,
foxglove::CAPABILITY_PARAMETERS_SUBSCRIBE,
foxglove::CAPABILITY_SERVICES,
};
if (_useSimTime) {
serverCapabilities.push_back(foxglove::CAPABILITY_TIME);
Expand Down Expand Up @@ -89,6 +93,8 @@ class FoxgloveBridge : public rclcpp::Node {
std::bind(&FoxgloveBridge::parameterChangeHandler, this, _1, _2, _3));
_server->setParameterSubscriptionHandler(
std::bind(&FoxgloveBridge::parameterSubscriptionHandler, this, _1, _2, _3));
_server->setServiceRequestHandler(
std::bind(&FoxgloveBridge::serviceRequestHandler, this, _1, _2));

_paramInterface->setParamUpdateCallback(std::bind(&FoxgloveBridge::parameterUpdates, this, _1));

Expand All @@ -109,6 +115,7 @@ class FoxgloveBridge : public rclcpp::Node {
_subscriptionCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
_clientPublishCallbackGroup =
this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
_servicesCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);

if (_useSimTime) {
_clockSubscription = this->create_subscription<rosgraph_msgs::msg::Clock>(
Expand All @@ -132,6 +139,7 @@ class FoxgloveBridge : public rclcpp::Node {

void rosgraphPollThread() {
updateAdvertisedTopics();
updateAdvertisedServices();

auto graphEvent = this->get_graph_event();
while (rclcpp::ok()) {
Expand All @@ -140,6 +148,7 @@ class FoxgloveBridge : public rclcpp::Node {
if (triggered) {
RCLCPP_DEBUG(this->get_logger(), "rosgraph change detected");
updateAdvertisedTopics();
updateAdvertisedServices();
// Graph changes tend to come in batches, so wait a bit before checking again
std::this_thread::sleep_for(500ms);
}
Expand Down Expand Up @@ -265,6 +274,97 @@ class FoxgloveBridge : public rclcpp::Node {
}
}

void updateAdvertisedServices() {
if (!rclcpp::ok()) {
return;
}

// Get the current list of visible services and datatypes from the ROS graph
const auto serviceNamesAndTypes =
this->get_node_graph_interface()->get_service_names_and_types();

std::lock_guard<std::mutex> lock(_servicesMutex);

// Remove advertisements for services that have been removed
std::vector<foxglove::ServiceId> servicesToRemove;
for (const auto& service : _advertisedServices) {
const auto it = std::find_if(serviceNamesAndTypes.begin(), serviceNamesAndTypes.end(),
[service](const auto& serviceNameAndTypes) {
return serviceNameAndTypes.first == service.second.name;
});
if (it == serviceNamesAndTypes.end()) {
servicesToRemove.push_back(service.first);
}
}
for (auto serviceId : servicesToRemove) {
_advertisedServices.erase(serviceId);
}
_server->removeServices(servicesToRemove);

// Advertise new services
std::vector<foxglove::ServiceWithoutId> newServices;
for (const auto& serviceNamesAndType : serviceNamesAndTypes) {
const auto& serviceName = serviceNamesAndType.first;
const auto& datatypes = serviceNamesAndType.second;

// Ignore the service if it's already advertised
if (std::find_if(_advertisedServices.begin(), _advertisedServices.end(),
[serviceName](const auto& idWithService) {
return idWithService.second.name == serviceName;
}) != _advertisedServices.end()) {
continue;
}

// Ignore the service if it is not on the service whitelist
if (std::find_if(_serviceWhitelistPatterns.begin(), _serviceWhitelistPatterns.end(),
[&serviceName](const auto& regex) {
return std::regex_match(serviceName, regex);
}) == _serviceWhitelistPatterns.end()) {
continue;
}

foxglove::ServiceWithoutId service;
service.name = serviceName;
service.type = datatypes.front();

try {
auto [format, reqSchema] = _messageDefinitionCache.get_full_text(service.type + "_Request");
auto resSchema = _messageDefinitionCache.get_full_text(service.type + "_Response").second;
switch (format) {
case foxglove::MessageDefinitionFormat::MSG:
service.requestSchema = reqSchema;
service.responseSchema = resSchema;
break;
case foxglove::MessageDefinitionFormat::IDL:
RCLCPP_WARN(this->get_logger(),
"IDL message definition format cannot be communicated over ws-protocol. "
"Service \"%s\" (%s) may not decode correctly in clients",
service.name.c_str(), service.type.c_str());
service.requestSchema = reqSchema;
service.responseSchema = resSchema;
break;
}
} catch (const foxglove::DefinitionNotFoundError& err) {
RCLCPP_WARN(this->get_logger(), "Could not find definition for type %s: %s",
service.type.c_str(), err.what());
// We still advertise the service, but with an emtpy schema
service.requestSchema = "";
service.responseSchema = "";
} catch (const std::exception& err) {
RCLCPP_WARN(this->get_logger(), "Failed to add service \"%s\" (%s): %s",
service.name.c_str(), service.type.c_str(), err.what());
continue;
}

newServices.push_back(service);
}

const auto serviceIds = _server->addServices(newServices);
for (size_t i = 0; i < serviceIds.size(); ++i) {
_advertisedServices.emplace(serviceIds[i], newServices[i]);
}
}

private:
struct PairHash {
template <class T1, class T2>
Expand All @@ -276,15 +376,20 @@ class FoxgloveBridge : public rclcpp::Node {
std::unique_ptr<foxglove::ServerInterface> _server;
foxglove::MessageDefinitionCache _messageDefinitionCache;
std::vector<std::regex> _topicWhitelistPatterns;
std::vector<std::regex> _serviceWhitelistPatterns;
std::shared_ptr<ParameterInterface> _paramInterface;
std::unordered_map<TopicAndDatatype, foxglove::Channel, PairHash> _advertisedTopics;
std::unordered_map<foxglove::ServiceId, foxglove::ServiceWithoutId> _advertisedServices;
std::unordered_map<foxglove::ChannelId, TopicAndDatatype> _channelToTopicAndDatatype;
std::unordered_map<foxglove::ChannelId, SubscriptionsByClient> _subscriptions;
PublicationsByClient _clientAdvertisedTopics;
std::unordered_map<foxglove::ServiceId, GenericClient::SharedPtr> _serviceClients;
rclcpp::CallbackGroup::SharedPtr _subscriptionCallbackGroup;
rclcpp::CallbackGroup::SharedPtr _clientPublishCallbackGroup;
rclcpp::CallbackGroup::SharedPtr _servicesCallbackGroup;
std::mutex _subscriptionsMutex;
std::mutex _clientAdvertisementsMutex;
std::mutex _servicesMutex;
std::unique_ptr<std::thread> _rosgraphPollThread;
size_t _maxQosDepth = DEFAULT_MAX_QOS_DEPTH;
std::shared_ptr<rclcpp::Subscription<rosgraph_msgs::msg::Clock>> _clockSubscription;
Expand Down Expand Up @@ -621,6 +726,55 @@ class FoxgloveBridge : public rclcpp::Node {
msg->get_rcl_serialized_message().buffer_length};
_server->sendMessage(clientHandle, channel.id, static_cast<uint64_t>(timestamp), payload);
}

void serviceRequestHandler(const foxglove::ServiceRequest& request,
foxglove::ConnHandle clientHandle) {
RCLCPP_DEBUG(this->get_logger(), "Received a request for service %d", request.serviceId);

std::lock_guard<std::mutex> lock(_servicesMutex);
const auto serviceIt = _advertisedServices.find(request.serviceId);
if (serviceIt == _advertisedServices.end()) {
RCLCPP_ERROR(this->get_logger(), "Service with id '%d' does not exist", request.serviceId);
return;
}

auto clientIt = _serviceClients.find(request.serviceId);
if (clientIt == _serviceClients.end()) {
auto clientOptions = rcl_client_get_default_options();
auto genClient = GenericClient::make_shared(
this->get_node_base_interface().get(), this->get_node_graph_interface(),
serviceIt->second.name, serviceIt->second.type, clientOptions);
clientIt = _serviceClients.emplace(request.serviceId, std::move(genClient)).first;
this->get_node_services_interface()->add_client(clientIt->second, _servicesCallbackGroup);
}

auto client = clientIt->second;
if (!client->wait_for_service(1s)) {
RCLCPP_ERROR(get_logger(), "Service %d (%s) is not available", request.serviceId,
serviceIt->second.name.c_str());
return;
}

auto reqMessage = std::make_shared<rclcpp::SerializedMessage>(request.data.size());
auto& rclSerializedMsg = reqMessage->get_rcl_serialized_message();
std::memcpy(rclSerializedMsg.buffer, request.data.data(), request.data.size());
rclSerializedMsg.buffer_length = request.data.size();

auto response_received_callback = [this, request,
clientHandle](GenericClient::SharedFuture future) {
const auto serializedResponseMsg = future.get()->get_rcl_serialized_message();

foxglove::ServiceRequest response;
response.serviceId = request.serviceId;
response.callId = request.callId;
response.encoding = request.encoding;
response.data.resize(serializedResponseMsg.buffer_length);
std::memcpy(response.data.data(), serializedResponseMsg.buffer,
serializedResponseMsg.buffer_length);
_server->sendServiceResponse(clientHandle, response);
};
client->async_send_request(reqMessage, response_received_callback);
}
};

} // namespace foxglove_bridge
Expand Down
Loading

0 comments on commit 289a774

Please sign in to comment.