Skip to content

Commit

Permalink
Implement fetchAsset spec
Browse files Browse the repository at this point in the history
  • Loading branch information
achim-k committed Jul 6, 2023
1 parent 5f96e84 commit 8c02db3
Show file tree
Hide file tree
Showing 19 changed files with 299 additions and 9 deletions.
10 changes: 7 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ if("$ENV{ROS_VERSION}" STREQUAL "1")
message(STATUS "Building with catkin")
set(ROS_BUILD_TYPE "catkin")

find_package(catkin REQUIRED COMPONENTS nodelet ros_babel_fish rosgraph_msgs roslib roscpp)
find_package(catkin REQUIRED COMPONENTS nodelet resource_retriever ros_babel_fish rosgraph_msgs roslib roscpp)
find_package(Boost REQUIRED)

catkin_package(
INCLUDE_DIRS foxglove_bridge_base/include
LIBRARIES foxglove_bridge_base foxglove_bridge_nodelet
CATKIN_DEPENDS nodelet ros_babel_fish rosgraph_msgs roslib roscpp
CATKIN_DEPENDS nodelet resource_retriever ros_babel_fish rosgraph_msgs roslib roscpp
DEPENDS Boost
)

Expand Down Expand Up @@ -135,6 +135,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2")
find_package(rosgraph_msgs REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclcpp_components REQUIRED)
find_package(resource_retriever REQUIRED)

add_library(foxglove_bridge_component SHARED
ros2_foxglove_bridge/src/message_definition_cache.cpp
Expand All @@ -149,7 +150,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2")
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/ros2_foxglove_bridge/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(foxglove_bridge_component rclcpp rclcpp_components rosgraph_msgs)
ament_target_dependencies(foxglove_bridge_component rclcpp rclcpp_components resource_retriever rosgraph_msgs)
target_link_libraries(foxglove_bridge_component foxglove_bridge_base)
rclcpp_components_register_nodes(foxglove_bridge_component "foxglove_bridge::FoxgloveBridge")
enable_strict_compiler_warnings(foxglove_bridge_component)
Expand Down Expand Up @@ -258,6 +259,9 @@ elseif(ROS_BUILD_TYPE STREQUAL "ament_cmake")
install(DIRECTORY ros2_foxglove_bridge/launch/
DESTINATION share/${PROJECT_NAME}/
)
install(FILES ros2_foxglove_bridge/include/foxglove_bridge/utils.hpp
DESTINATION include/${PROJECT_NAME}/
)
ament_export_libraries(foxglove_bridge_base foxglove_bridge_component)
ament_package()
endif()
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ Parameters are provided to configure the behavior of the bridge. These parameter
* __client_topic_whitelist__: List of regular expressions ([ECMAScript grammar](https://en.cppreference.com/w/cpp/regex/ecmascript)) of whitelisted client-published topic names. Defaults to `[".*"]`.
* __send_buffer_limit__: Connection send buffer limit in bytes. Messages will be dropped when a connection's send buffer reaches this limit to avoid a queue of outdated messages building up. Defaults to `10000000` (10 MB).
* __use_compression__: Use websocket compression (permessage-deflate). Suited for connections with smaller bandwith, at the cost of additional CPU load.
* __capabilities__: List of supported [server capabilities](https://github.com/foxglove/ws-protocol/blob/main/docs/spec.md). Defaults to `[clientPublish,parameters,parametersSubscribe,services,connectionGraph]`.
* __capabilities__: List of supported [server capabilities](https://github.com/foxglove/ws-protocol/blob/main/docs/spec.md). Defaults to `[clientPublish,parameters,parametersSubscribe,services,connectionGraph,assets]`.
* __asset_uri_whitelist__: List of regular expressions ([ECMAScript grammar](https://en.cppreference.com/w/cpp/regex/ecmascript)) of allowed asset URIs. Defaults to `["package://.*"]`.
* (ROS 1) __max_update_ms__: The maximum number of milliseconds to wait in between polling `roscore` for new topics, services, or parameters. Defaults to `5000`.
* (ROS 2) __num_threads__: The number of threads to use for the ROS node executor. This controls the number of subscriptions that can be processed in parallel. 0 means one thread per CPU core. Defaults to `0`.
* (ROS 2) __min_qos_depth__: Minimum depth used for the QoS profile of subscriptions. Defaults to `1`. This is to set a lower limit for a subscriber's QoS depth which is computed by summing up depths of all publishers. See also [#208](https://github.com/foxglove/ros-foxglove-bridge/issues/208).
Expand Down
18 changes: 16 additions & 2 deletions foxglove_bridge_base/include/foxglove_bridge/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ constexpr char CAPABILITY_PARAMETERS[] = "parameters";
constexpr char CAPABILITY_PARAMETERS_SUBSCRIBE[] = "parametersSubscribe";
constexpr char CAPABILITY_SERVICES[] = "services";
constexpr char CAPABILITY_CONNECTION_GRAPH[] = "connectionGraph";
constexpr char CAPABILITY_ASSETS[] = "assets";

constexpr std::array<const char*, 5> DEFAULT_CAPABILITIES = {
constexpr std::array<const char*, 6> DEFAULT_CAPABILITIES = {
CAPABILITY_CLIENT_PUBLISH, CAPABILITY_CONNECTION_GRAPH, CAPABILITY_PARAMETERS_SUBSCRIBE,
CAPABILITY_PARAMETERS, CAPABILITY_SERVICES,
CAPABILITY_PARAMETERS, CAPABILITY_SERVICES, CAPABILITY_ASSETS,
};

using ChannelId = uint32_t;
Expand All @@ -31,6 +32,7 @@ enum class BinaryOpcode : uint8_t {
MESSAGE_DATA = 1,
TIME_DATA = 2,
SERVICE_CALL_RESPONSE = 3,
FETCH_ASSET_RESPONSE = 4,
};

enum class ClientBinaryOpcode : uint8_t {
Expand Down Expand Up @@ -145,4 +147,16 @@ struct ServiceResponse {

using ServiceRequest = ServiceResponse;

enum FetchAssetStatus {
Success = 0,
Error = 1,
};

struct FetchAssetResponse {
uint32_t requestId;
FetchAssetStatus status;
std::string errorMessage;
std::vector<uint8_t> data;
};

} // namespace foxglove
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ struct ServerHandlers {
parameterSubscriptionHandler;
std::function<void(const ServiceRequest&, ConnectionHandle)> serviceRequestHandler;
std::function<void(bool)> subscribeConnectionGraphHandler;
std::function<void(const std::string&, uint32_t, ConnectionHandle)> fetchAssetHandler;
};

template <typename ConnectionHandle>
Expand Down Expand Up @@ -101,6 +102,8 @@ class ServerInterface {
virtual void updateConnectionGraph(const MapOfSets& publishedTopics,
const MapOfSets& subscribedTopics,
const MapOfSets& advertisedServices) = 0;
virtual void sendFetchAssetResponse(ConnectionHandle clientHandle,
const FetchAssetResponse& response) = 0;

virtual uint16_t getPort() = 0;
virtual std::string remoteEndpointString(ConnectionHandle clientHandle) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ std::future<Service> waitForService(std::shared_ptr<ClientInterface> client,
std::future<Channel> waitForChannel(std::shared_ptr<ClientInterface> client,
const std::string& topicName);

std::future<FetchAssetResponse> waitForFetchAssetResponse(std::shared_ptr<ClientInterface> client);

extern template class Client<websocketpp::config::asio_client>;

} // namespace foxglove
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class ClientInterface {
const std::optional<std::string>& requestId) = 0;
virtual void subscribeParameterUpdates(const std::vector<std::string>& parameterNames) = 0;
virtual void unsubscribeParameterUpdates(const std::vector<std::string>& parameterNames) = 0;
virtual void fetchAsset(const std::string& name, uint32_t requestId) = 0;

virtual void setTextMessageHandler(TextMessageHandler handler) = 0;
virtual void setBinaryMessageHandler(BinaryMessageHandler handler) = 0;
Expand Down Expand Up @@ -221,6 +222,11 @@ class Client : public ClientInterface {
sendText(jsonPayload.dump());
}

void fetchAsset(const std::string& uri, uint32_t requestId) override {
nlohmann::json jsonPayload{{"op", "fetchAsset"}, {"uri", uri}, {"requestId", requestId}};
sendText(jsonPayload.dump());
}

void setTextMessageHandler(TextMessageHandler handler) override {
std::unique_lock<std::shared_mutex> lock(_mutex);
_textMessageHandler = std::move(handler);
Expand Down
51 changes: 51 additions & 0 deletions foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const std::unordered_map<std::string, std::string> CAPABILITY_BY_CLIENT_OPERATIO
{"unsubscribeParameterUpdates", CAPABILITY_PARAMETERS_SUBSCRIBE},
{"subscribeConnectionGraph", CAPABILITY_CONNECTION_GRAPH},
{"unsubscribeConnectionGraph", CAPABILITY_CONNECTION_GRAPH},
{"fetchAsset", CAPABILITY_ASSETS},
};

/// Map of required capability by client operation (binary).
Expand Down Expand Up @@ -131,6 +132,7 @@ class Server final : public ServerInterface<ConnHandle> {
void sendServiceResponse(ConnHandle clientHandle, const ServiceResponse& response) override;
void updateConnectionGraph(const MapOfSets& publishedTopics, const MapOfSets& subscribedTopics,
const MapOfSets& advertisedServices) override;
void sendFetchAssetResponse(ConnHandle clientHandle, const FetchAssetResponse& response) override;

uint16_t getPort() override;
std::string remoteEndpointString(ConnHandle clientHandle) override;
Expand Down Expand Up @@ -622,6 +624,7 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
constexpr auto UNSUBSCRIBE_PARAMETER_UPDATES = Integer("unsubscribeParameterUpdates");
constexpr auto SUBSCRIBE_CONNECTION_GRAPH = Integer("subscribeConnectionGraph");
constexpr auto UNSUBSCRIBE_CONNECTION_GRAPH = Integer("unsubscribeConnectionGraph");
constexpr auto FETCH_ASSET = Integer("fetchAsset");

switch (Integer(op)) {
case SUBSCRIBE: {
Expand Down Expand Up @@ -906,6 +909,22 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
"Client was not subscribed to connection graph updates");
}
} break;
case FETCH_ASSET: {
if (!_handlers.fetchAssetHandler) {
return;
}

const auto uri = payload.at("uri").get<std::string>();
const auto requestId = payload.at("requestId").get<uint32_t>();

try {
_handlers.fetchAssetHandler(uri, requestId, hdl);
} catch (const std::exception& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
}
} break;
default: {
sendStatusAndLogMsg(hdl, StatusLevel::Error, "Unrecognized client opcode \"" + op + "\"");
} break;
Expand Down Expand Up @@ -1371,4 +1390,36 @@ inline bool Server<ServerConfiguration>::hasCapability(const std::string& capabi
_options.capabilities.end();
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::sendFetchAssetResponse(
ConnHandle clientHandle, const FetchAssetResponse& response) {
std::error_code ec;
const auto con = _server.get_con_from_hdl(clientHandle, ec);
if (ec || !con) {
return;
}

const size_t dataSize = response.status == FetchAssetStatus::Success ? response.data.size() : 0ul;
const size_t messageSize = 1 + 4 + 1 + 4 + response.errorMessage.size() + dataSize;

auto message = con->get_message(OpCode::BINARY, messageSize);

const auto op = BinaryOpcode::FETCH_ASSET_RESPONSE;
message->append_payload(&op, 1);

std::array<uint8_t, 4> uint32Data;
foxglove::WriteUint32LE(uint32Data.data(), response.requestId);
message->append_payload(uint32Data.data(), uint32Data.size());

const uint8_t status = static_cast<uint8_t>(response.status);
message->append_payload(&status, 1);

foxglove::WriteUint32LE(uint32Data.data(), response.errorMessage.size());
message->append_payload(uint32Data.data(), uint32Data.size());
message->append_payload(response.errorMessage);

message->append_payload(response.data.data(), dataSize);
con->send(message);
}

} // namespace foxglove
28 changes: 28 additions & 0 deletions foxglove_bridge_base/src/test/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,35 @@ std::future<Channel> waitForChannel(std::shared_ptr<ClientInterface> client,
}
}
});
return future;
}

std::future<FetchAssetResponse> waitForFetchAssetResponse(std::shared_ptr<ClientInterface> client) {
auto promise = std::make_shared<std::promise<FetchAssetResponse>>();
auto future = promise->get_future();

client->setBinaryMessageHandler(
[promise = std::move(promise)](const uint8_t* data, size_t dataLength) mutable {
if (static_cast<BinaryOpcode>(data[0]) != BinaryOpcode::FETCH_ASSET_RESPONSE) {
return;
}

foxglove::FetchAssetResponse response;
size_t offset = 1;
response.requestId = ReadUint32LE(data + offset);
offset += 4;
response.status = static_cast<foxglove::FetchAssetStatus>(data[offset]);
offset += 1;
const size_t errorMsgLength = static_cast<size_t>(ReadUint32LE(data + offset));
offset += 4;
response.errorMessage =
std::string(reinterpret_cast<const char*>(data + offset), errorMsgLength);
offset += errorMsgLength;
const auto payloadLength = dataLength - offset;
response.data.resize(payloadLength);
std::memcpy(response.data.data(), data + offset, payloadLength);
promise->set_value(response);
});
return future;
}

Expand Down
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<exec_depend>openssl</exec_depend>
<exec_depend>zlib</exec_depend>

<depend>resource_retriever</depend>
<depend>rosgraph_msgs</depend>

<!-- The export tag contains other, unspecified, tags -->
Expand Down
4 changes: 3 additions & 1 deletion ros1_foxglove_bridge/launch/foxglove_bridge.launch
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
<arg name="send_buffer_limit" default="10000000" />
<arg name="nodelet_manager" default="foxglove_nodelet_manager" />
<arg name="num_threads" default="0" />
<arg name="capabilities" default="[clientPublish,parameters,parametersSubscribe,services,connectionGraph]" />
<arg name="capabilities" default="[clientPublish,parameters,parametersSubscribe,services,connectionGraph,assets]" />
<arg name="asset_uri_whitelist" default="['package://.*']" />

<node pkg="nodelet" type="nodelet" name="foxglove_nodelet_manager" args="manager"
if="$(eval nodelet_manager == 'foxglove_nodelet_manager')">
Expand All @@ -34,5 +35,6 @@
<rosparam param="service_whitelist" subst_value="True">$(arg service_whitelist)</rosparam>
<rosparam param="client_topic_whitelist" subst_value="True">$(arg client_topic_whitelist)</rosparam>
<rosparam param="capabilities" subst_value="True">$(arg capabilities)</rosparam>
<rosparam param="asset_uri_whitelist" subst_value="True">$(arg asset_uri_whitelist)</rosparam>
</node>
</launch>
40 changes: 40 additions & 0 deletions ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <nodelet/nodelet.h>
#include <pluginlib/class_list_macros.h>
#include <resource_retriever/retriever.h>
#include <ros/message_event.h>
#include <ros/ros.h>
#include <ros/xmlrpc_manager.h>
Expand Down Expand Up @@ -100,6 +101,13 @@ class FoxgloveBridge : public nodelet::Nodelet {
ROS_ERROR("Failed to parse one or more service whitelist patterns");
}

const auto assetUriWhitelist =
nhp.param<std::vector<std::string>>("asset_uri_whitelist", {"package://.*", "file://.*"});
_assetUriWhitelistPatterns = parseRegexPatterns(assetUriWhitelist);
if (assetUriWhitelist.size() != _assetUriWhitelistPatterns.size()) {
ROS_ERROR("Failed to parse one or more asset URI whitelist patterns");
}

const char* rosDistro = std::getenv("ROS_DISTRO");
ROS_INFO("Starting foxglove_bridge (%s, %s@%s) with %s", rosDistro,
foxglove::FOXGLOVE_BRIDGE_VERSION, foxglove::FOXGLOVE_BRIDGE_GIT_HASH,
Expand Down Expand Up @@ -151,6 +159,13 @@ class FoxgloveBridge : public nodelet::Nodelet {
hdlrs.subscribeConnectionGraphHandler = [this](bool subscribe) {
_subscribeGraphUpdates = subscribe;
};

if (hasCapability(foxglove::CAPABILITY_ASSETS)) {
hdlrs.fetchAssetHandler =
std::bind(&FoxgloveBridge::fetchAsset, this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
}

_server->setHandlers(std::move(hdlrs));

_server->start(address, static_cast<uint16_t>(port));
Expand Down Expand Up @@ -844,6 +859,29 @@ class FoxgloveBridge : public nodelet::Nodelet {
}
}

void fetchAsset(const std::string& uri, uint32_t requestId, ConnectionHandle clientHandle) {
foxglove::FetchAssetResponse response;
response.requestId = requestId;

try {
if (!isWhitelisted(uri, _assetUriWhitelistPatterns)) {
throw std::runtime_error("Asset URI not allowed: " + uri);
}

const resource_retriever::MemoryResource memoryResource = _resource_retriever.get(uri);
response.status = foxglove::FetchAssetStatus::Success;
response.errorMessage = "";
response.data.resize(memoryResource.size);
std::memcpy(response.data.data(), memoryResource.data.get(), memoryResource.size);
} catch (const resource_retriever::Exception& ex) {
ROS_WARN("Failed to retrieve asset '%s': %s", uri.c_str(), ex.what());
response.status = foxglove::FetchAssetStatus::Error;
response.errorMessage = "Failed to retrieve asset " + uri;
}

_server->sendFetchAssetResponse(clientHandle, response);
}

bool hasCapability(const std::string& capability) {
return std::find(_capabilities.begin(), _capabilities.end(), capability) != _capabilities.end();
}
Expand All @@ -853,6 +891,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
std::vector<std::regex> _topicWhitelistPatterns;
std::vector<std::regex> _paramWhitelistPatterns;
std::vector<std::regex> _serviceWhitelistPatterns;
std::vector<std::regex> _assetUriWhitelistPatterns;
ros::XMLRPCManager xmlrpcServer;
std::unordered_map<foxglove::ChannelId, foxglove::ChannelWithoutId> _advertisedTopics;
std::unordered_map<foxglove::ChannelId, SubscriptionsByClient> _subscriptions;
Expand All @@ -868,6 +907,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
bool _useSimTime = false;
std::vector<std::string> _capabilities;
std::atomic<bool> _subscribeGraphUpdates = false;
resource_retriever::Retriever _resource_retriever;
};

} // namespace foxglove_bridge
Expand Down
1 change: 1 addition & 0 deletions ros1_foxglove_bridge/tests/smoke.test
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<launch>
<node name="foxglove_bridge" pkg="foxglove_bridge" type="foxglove_bridge" output="screen">
<param name="port" value="9876" />
<rosparam param="asset_uri_whitelist" subst_value="True">['file://.*']</rosparam>
</node>

<test test-name="smoke_test" pkg="foxglove_bridge" type="smoke_test" />
Expand Down
Loading

0 comments on commit 8c02db3

Please sign in to comment.