Skip to content

Commit

Permalink
Add rcl_publisher_wait_for_all_acked support (ros2#913)
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Aug 25, 2021
1 parent 4296dd1 commit 6ec226a
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 0 deletions.
42 changes: 42 additions & 0 deletions rcl/include/rcl/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C"
#include "rcl/macros.h"
#include "rcl/node.h"
#include "rcl/visibility_control.h"
#include "rcl/time.h"

/// Internal rcl publisher implementation struct.
struct rcl_publisher_impl_t;
Expand Down Expand Up @@ -434,6 +435,47 @@ RCL_WARN_UNUSED
rcl_ret_t
rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher);

/// Wait until all published message data is acknowledged or until the specified timeout elapses.
/**
* This function waits until all published message data were acknowledged by peer node or timeout.
*
* The timeout unit is nanoseconds.
* If the timeout is negative then this function will block indefinitely until all published message
* data were acknowledged.
* If the timeout is 0 then this function will be non-blocking; checking all published message data
* were acknowledged (If acknowledged, return RCL_RET_OK. Otherwise, return RCL_RET_TIMEOUT), but
* not waiting.
* If the timeout is greater than 0 then this function will return after that period of time has
* elapsed (return RCL_RET_TIMEOUT) or all published message data were acknowledged (return
* RCL_RET_OK).
*
* This function only waits for acknowledgments if the publisher's QOS profile is RELIABLE.
* Otherwise this function will immediately return RCL_RET_OK.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | No
* Thread-Safe | Yes
* Uses Atomics | No
* Lock-Free | No
*
* \param[in] publisher handle to the publisher that needs to wait for all acked.
* \param[in] timeout the duration to wait for all published message data were acknowledged, in
* nanoseconds.
* \return #RCL_RET_OK if successful, or
* \return #RCL_RET_TIMEOUT if timed out, or
* \return #RCL_RET_PUBLISHER_INVALID if publisher is invalid, or
* \return #RCL_RET_ERROR if an unspecified error occurs, or
* \return #RCL_RET_UNSUPPORTED if the middleware does not support that feature.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_publisher_wait_for_all_acked(
const rcl_publisher_t * publisher,
rcl_duration_value_t timeout);

/// Get the topic name for the publisher.
/**
* This function returns the publisher's internal topic name string.
Expand Down
37 changes: 37 additions & 0 deletions rcl/src/rcl/publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ extern "C"
#include "rcl/node.h"
#include "rcutils/logging_macros.h"
#include "rcutils/macros.h"
#include "rcl/time.h"
#include "rmw/time.h"
#include "rmw/error_handling.h"
#include "tracetools/tracetools.h"

Expand Down Expand Up @@ -304,6 +306,41 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher)
return RCL_RET_OK;
}

rcl_ret_t
rcl_publisher_wait_for_all_acked(const rcl_publisher_t * publisher, rcl_duration_value_t timeout)
{
if (!rcl_publisher_is_valid(publisher)) {
return RCL_RET_PUBLISHER_INVALID; // error already set
}

rmw_time_t rmw_timeout;
if (timeout > 0) {
rmw_timeout.sec = RCL_NS_TO_S(timeout);
rmw_timeout.nsec = timeout % 1000000000;
} else if (timeout < 0) {
rmw_time_t infinite = RMW_DURATION_INFINITE;
rmw_timeout = infinite;
} else {
rmw_time_t zero = RMW_DURATION_UNSPECIFIED;
rmw_timeout = zero;
}

rmw_ret_t ret = rmw_publisher_wait_for_all_acked(publisher->impl->rmw_handle, rmw_timeout);
if (ret != RMW_RET_OK) {
if (ret == RMW_RET_TIMEOUT) {
return RCL_RET_TIMEOUT;
}
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
if (ret == RMW_RET_UNSUPPORTED) {
return RCL_RET_UNSUPPORTED;
} else {
return RCL_RET_ERROR;
}
}

return RCL_RET_OK;
}

const char *
rcl_publisher_get_topic_name(const rcl_publisher_t * publisher)
{
Expand Down
9 changes: 9 additions & 0 deletions rcl/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,15 @@ function(test_target_function)
AMENT_DEPENDENCIES ${rmw_implementation} "osrf_testing_tools_cpp" "test_msgs"
)

rcl_add_custom_gtest(test_publisher_wait_all_ack${target_suffix}
SRCS rcl/test_publisher_wait_all_ack.cpp rcl/wait_for_entity_helpers.cpp
ENV ${rmw_implementation_env_var}
APPEND_LIBRARY_DIRS ${extra_lib_dirs}
INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/../src/rcl/
LIBRARIES ${PROJECT_NAME} mimick
AMENT_DEPENDENCIES ${rmw_implementation} "osrf_testing_tools_cpp" "test_msgs"
)

rcl_add_custom_gtest(test_service${target_suffix}
SRCS rcl/test_service.cpp rcl/wait_for_entity_helpers.cpp
ENV ${rmw_implementation_env_var}
Expand Down
69 changes: 69 additions & 0 deletions rcl/test/rcl/test_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish

EXPECT_EQ(RCL_RET_OK, rcl_publisher_assert_liveliness(&publisher));

EXPECT_EQ(RCL_RET_OK, rcl_publisher_wait_for_all_acked(&publisher, 0));

size_t count_size;
test_msgs__msg__BasicTypes msg;
rcl_serialized_message_t serialized_msg = rmw_get_zero_initialized_serialized_message();
Expand Down Expand Up @@ -429,6 +431,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg));
rcl_reset_error();
EXPECT_EQ(
Expand Down Expand Up @@ -471,6 +475,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg));
rcl_reset_error();
EXPECT_EQ(
Expand Down Expand Up @@ -502,6 +508,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg));
rcl_reset_error();
EXPECT_EQ(
Expand Down Expand Up @@ -532,6 +540,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(nullptr));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(nullptr, 10000000));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(nullptr, &msg, null_allocation_is_valid_arg));
rcl_reset_error();
EXPECT_EQ(
Expand Down Expand Up @@ -572,6 +582,65 @@ TEST_F(CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION), test_mock_assert
rcl_reset_error();
}

// Mocking rmw_publisher_wait_for_all_acked to make
// rcl_publisher_wait_for_all_acked fail
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, ==)
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, !=)
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, <)
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, >)

TEST_F(
CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION),
test_mock_assert_wait_for_all_acked)
{
#define CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_RESULT, EXPECT_RET) do { \
rmw_publisher_wait_for_all_acked_return = RMW_RET_RESULT; \
ret = rcl_publisher_wait_for_all_acked(&publisher, 1000000); \
EXPECT_EQ(EXPECT_RET, ret); \
rcl_reset_error(); \
} while (0)

rcl_ret_t ret;
rmw_ret_t rmw_publisher_wait_for_all_acked_return;
auto mock = mocking_utils::patch_and_return(
"lib:rcl", rmw_publisher_wait_for_all_acked, rmw_publisher_wait_for_all_acked_return);

{
// Now normal usage of the function rcl_publisher_wait_for_all_acked returning
// unexpected RMW_RET_TIMEOUT
SCOPED_TRACE("Check RCL return failed !");
CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_TIMEOUT, RCL_RET_TIMEOUT);
}

{
// Now normal usage of the function rcl_publisher_wait_for_all_acked returning
// unexpected RMW_RET_UNSUPPORTED
SCOPED_TRACE("Check RCL return failed !");
CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_UNSUPPORTED, RCL_RET_UNSUPPORTED);
}

{
// Now normal usage of the function rcl_publisher_wait_for_all_acked returning
// unexpected RMW_RET_INVALID_ARGUMENT
SCOPED_TRACE("Check RCL return failed !");
CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_INVALID_ARGUMENT, RCL_RET_ERROR);
}

{
// Now normal usage of the function rcl_publisher_wait_for_all_acked returning
// unexpected RMW_RET_INCORRECT_RMW_IMPLEMENTATION
SCOPED_TRACE("Check RCL return failed !");
CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_INCORRECT_RMW_IMPLEMENTATION, RCL_RET_ERROR);
}

{
// Now normal usage of the function rcl_publisher_wait_for_all_acked returning
// unexpected RMW_RET_ERROR
SCOPED_TRACE("Check RCL return failed !");
CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_ERROR, RCL_RET_ERROR);
}
}

// Mocking rmw_publish to make rcl_publish fail
TEST_F(CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION), test_mock_publish) {
auto mock = mocking_utils::patch_and_return("lib:rcl", rmw_publish, RMW_RET_ERROR);
Expand Down
Loading

0 comments on commit 6ec226a

Please sign in to comment.