Skip to content

Commit

Permalink
make serialized message a class
Browse files Browse the repository at this point in the history
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
  • Loading branch information
Karsten1987 committed Apr 18, 2020
1 parent 1cd811a commit d6bf983
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 125 deletions.
11 changes: 11 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/publisher_base.cpp
src/rclcpp/qos.cpp
src/rclcpp/qos_event.cpp
src/rclcpp/serialization.cpp
src/rclcpp/serialized_message.cpp
src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription_base.cpp
Expand Down Expand Up @@ -405,6 +407,15 @@ if(BUILD_TESTING)
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_serialized_message test/test_serialized_message.cpp)
if(TARGET test_serialized_message)
ament_target_dependencies(test_serialized_message
test_msgs
)
target_link_libraries(test_serialized_message
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_service test/test_service.cpp)
if(TARGET test_service)
ament_target_dependencies(test_service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
msg_info.from_intra_process = true;

ConstMessageSharedPtr msg = buffer_->consume_shared();
auto serialized_msg =
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()));
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()), serialized_msg.get());

if (nullptr == serialized_msg) {
if (0u == serialized_msg->buffer_length) {
throw std::runtime_error("Subscription intra-process could not serialize message");
}

if (any_callback_.use_take_shared_method()) {
any_callback_.dispatch_intra_process(serialized_msg, msg_info);
any_callback_.dispatch_intra_process(std::static_pointer_cast<rcl_serialized_message_t>(serialized_msg), msg_info);
} else {
throw std::runtime_error(
"Subscription intra-process for serialized "
Expand Down Expand Up @@ -243,8 +243,8 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
throw std::runtime_error("Subscription intra-process can't handle unserialized messages");
}

ConstMessageSharedPtr serialized_container = buffer_->consume_shared();
if (nullptr == serialized_container) {
ConstMessageSharedPtr serialized_message = buffer_->consume_shared();
if (nullptr == serialized_message) {
throw std::runtime_error("Subscription intra-process could not get serialized message");
}

Expand All @@ -254,13 +254,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
if (any_callback_.use_take_shared_method()) {
CallbackMessageSharedPtr msg = construct_unique();
serializer_->deserialize_message(
serialized_container.get(),
serialized_message.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(msg, msg_info);
} else {
CallbackMessageUniquePtr msg = construct_unique();
serializer_->deserialize_message(
serialized_container.get(),
serialized_message.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(std::move(msg), msg_info);
}
Expand Down
77 changes: 11 additions & 66 deletions rclcpp/include/rclcpp/serialization.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
#ifndef RCLCPP__SERIALIZATION_HPP_
#define RCLCPP__SERIALIZATION_HPP_

#include <rmw/rmw.h>

#include <memory>
#include <string>

#include "rcl/error_handling.h"
#include "rosidl_runtime_c/message_type_support_struct.h"

namespace rclcpp
{

class SerializedMessage;

/// Interface to (de)serialize a message
class SerializationBase
{
Expand All @@ -35,87 +35,32 @@ class SerializationBase
/**
* \param[in] message The ROS2 message which is read and serialized by rmw.
*/
virtual std::shared_ptr<rcl_serialized_message_t> serialize_message(const void * message) = 0;
virtual void serialize_message(
const void * ros_message, SerializedMessage * serialized_message) const = 0;

/// Deserialize a serialized stream to a ROS message
/**
* \param[in] serialized_message The serialized message to be converted to ROS2 by rmw.
* \param[out] message The deserialized ROS2 message.
*/
virtual void deserialize_message(
const rcl_serialized_message_t * serialized_message,
void * msg) = 0;
const SerializedMessage * serialized_message, void * ros_message) const = 0;
};

/// Default implementation to (de)serialize a message by using rmw_(de)serialize
class Serialization : public SerializationBase
{
public:
Serialization(
const rosidl_message_type_support_t & type_support,
const rcutils_allocator_t allocator = rcutils_get_default_allocator())
: type_support_(type_support), rcutils_allocator_(allocator)
{}

std::shared_ptr<rcl_serialized_message_t> serialize_message(const void * message) override
{
auto serialized_message = new rcl_serialized_message_t;
*serialized_message = rmw_get_zero_initialized_serialized_message();
const auto ret = rmw_serialized_message_init(serialized_message, 0, &rcutils_allocator_);
if (ret != RCUTILS_RET_OK) {
throw std::runtime_error(
"Error allocating resources for serialized message: " +
std::string(rcutils_get_error_string().str));
}

if (nullptr == message) {
delete serialized_message;
throw std::runtime_error("Message is nullpointer while serialization.");
}

const auto error = rmw_serialize(
message,
&type_support_,
serialized_message);
if (error != RCL_RET_OK) {
delete serialized_message;
throw std::runtime_error("Failed to serialize.");
}

auto shared_serialized_msg = std::shared_ptr<rcl_serialized_message_t>(
serialized_message,
[](rcl_serialized_message_t * msg) {
auto fini_ret = rmw_serialized_message_fini(msg);
delete msg;
if (fini_ret != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to destroy serialized message: %s", rcl_get_error_string().str);
}
});

return shared_serialized_msg;
}
Serialization(const rosidl_message_type_support_t & type_support);

void deserialize_message(const rcl_serialized_message_t * serialized_message, void * msg) override
{
if (nullptr == serialized_message ||
serialized_message->buffer_capacity == 0 ||
serialized_message->buffer_length == 0 ||
!serialized_message->buffer)
{
throw std::runtime_error("Failed to deserialize nullptr serialized message.");
}
void serialize_message(
const void * ros_message, SerializedMessage * serialized_message) const override;

const auto ret = rmw_deserialize(serialized_message, &type_support_, msg);
if (ret != RMW_RET_OK) {
throw std::runtime_error("Failed to deserialize serialized message.");
}
}
void deserialize_message(
const SerializedMessage * serialized_message, void * ros_message) const override;

private:
rosidl_message_type_support_t type_support_;
rcutils_allocator_t rcutils_allocator_;
};

} // namespace rclcpp
Expand Down
78 changes: 33 additions & 45 deletions rclcpp/include/rclcpp/serialized_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@
#ifndef RCLCPP__SERIALIZED_MESSAGE_HPP_
#define RCLCPP__SERIALIZED_MESSAGE_HPP_

#include <rclcpp/exceptions.hpp>

#include <cstring>

#include "rcutils/logging_macros.h"

#include "rmw/serialized_message.h"
#include "rcl/allocator.h"
#include "rcl/types.h"

namespace rclcpp
{
Expand All @@ -30,49 +25,42 @@ namespace rclcpp
class SerializedMessage : public rcl_serialized_message_t
{
public:
SerializedMessage()
: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message())
{}
/// Default constructor for a SerializedMessage
/**
* Default constructs a serialized message and initalizes its
* capacity with 0.
*
* \param[in] allocator The allocator to be used for the initialzation.
*/
explicit SerializedMessage(
const rcl_allocator_t & allocator = rcl_get_default_allocator());

/// Default constructor for a SerializedMessage
/**
* Default constructs a serialized message and initalizes its
* capacity with 0.
*
* \param[in] initial_capacity The amount of memory to be allocated.
* \param[in] allocator The allocator to be used for the initialzation.
*/
explicit SerializedMessage(
size_t initial_capacity,
const rcl_allocator_t & allocator = rcl_get_default_allocator());

explicit SerializedMessage(const SerializedMessage & serialized_message)
: SerializedMessage(static_cast<const rcl_serialized_message_t>(serialized_message))
{}
/// Copy Constructor for a SerializedMessage
explicit SerializedMessage(const SerializedMessage & serialized_message);

explicit SerializedMessage(const rcl_serialized_message_t & serialized_message)
: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message())
{
const auto ret = rmw_serialized_message_init(
this, serialized_message.buffer_length,
&serialized_message.allocator);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
/// Copy Constructor for a SerializedMessage from a rcl_serialized_message_t
explicit SerializedMessage(const rcl_serialized_message_t & serialized_message);

// do not call memcpy if the pointer is "static"
if (buffer != serialized_message.buffer) {
std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length);
}
buffer_length = serialized_message.buffer_length;
}
/// Move Constructor for a SerializedMessage
explicit SerializedMessage(SerializedMessage && serialized_message);

explicit SerializedMessage(rcl_serialized_message_t && msg)
: rcl_serialized_message_t(msg)
{
// reset buffer to prevent double free
msg = rmw_get_zero_initialized_serialized_message();
}
/// Move Constructor for a SerializedMessage from a rcl_serialized_message_t
explicit SerializedMessage(rcl_serialized_message_t && serialized_message);

~SerializedMessage()
{
if (nullptr != buffer) {
const auto fini_ret = rmw_serialized_message_fini(this);
if (fini_ret != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to destroy serialized message: %s", rcl_get_error_string().str);
}
}
}
/// Destructor for a SerializedMessage
~SerializedMessage();
};

} // namespace rclcpp
Expand Down
8 changes: 2 additions & 6 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,7 @@ class Subscription : public SubscriptionBase
this->get_topic_name(), // important to get it by the fully-qualified name
qos.get_rmw_qos_profile(),
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback),
std::make_shared<rclcpp::Serialization>(
type_support_handle,
options.template to_rcl_subscription_options<CallbackMessageT>(qos).allocator)
std::make_shared<rclcpp::Serialization>(type_support_handle)
);
TRACEPOINT(
rclcpp_subscription_init,
Expand Down Expand Up @@ -216,9 +214,7 @@ class Subscription : public SubscriptionBase
this->get_topic_name(), // important to get it by the fully-qualified name
qos.get_rmw_qos_profile(),
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback),
std::make_shared<rclcpp::Serialization>(
type_support_handle,
options.template to_rcl_subscription_options<CallbackMessageT>(qos).allocator)
std::make_shared<rclcpp::Serialization>(type_support_handle)
);
TRACEPOINT(
rclcpp_subscription_init,
Expand Down
68 changes: 68 additions & 0 deletions rclcpp/src/rclcpp/serialization.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rclcpp/serialization.hpp"

#include <memory>
#include <string>

#include "rclcpp/exceptions.hpp"
#include "rclcpp/serialized_message.hpp"

#include "rcpputils/asserts.hpp"

#include "rmw/rmw.h"

namespace rclcpp
{

Serialization::Serialization(const rosidl_message_type_support_t & type_support)
: type_support_(type_support)
{}

void Serialization::serialize_message(
const void * ros_message, SerializedMessage * serialized_message) const
{
rcpputils::check_true(ros_message != nullptr, "ROS message is nullpointer.");
rcpputils::check_true(serialized_message != nullptr, "Serialized message is nullpointer.");

const auto ret = rmw_serialize(
ros_message,
&type_support_,
serialized_message);
if (ret != RMW_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to serialize ROS message.");
}
}

void
Serialization::deserialize_message(
const SerializedMessage * serialized_message, void * ros_message) const
{
rcpputils::check_true(nullptr != serialized_message, "Serialized message is nullpointer.");
rcpputils::check_true(
serialized_message->buffer_capacity != 0 &&
serialized_message->buffer_length != 0 &&
!serialized_message->buffer, "Serialized message is wrongly initialized.");
{
throw std::runtime_error("Failed to deserialize nullptr serialized message.");
}

const auto ret = rmw_deserialize(serialized_message, &type_support_, ros_message);
if (ret != RMW_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to deserialize ROS message.");
}
}

} // namespace rclcpp
Loading

0 comments on commit d6bf983

Please sign in to comment.