diff --git a/doc/modules/ROOT/nav.adoc b/doc/modules/ROOT/nav.adoc index be90d837..7c10e4ae 100644 --- a/doc/modules/ROOT/nav.adoc +++ b/doc/modules/ROOT/nav.adoc @@ -1,5 +1,6 @@ * xref:index.adoc[Introduction] * xref:requests_responses.adoc[] +* xref:cancellation.adoc[] * xref:serialization.adoc[] * xref:logging.adoc[] * xref:benchmarks.adoc[] diff --git a/doc/modules/ROOT/pages/cancellation.adoc b/doc/modules/ROOT/pages/cancellation.adoc new file mode 100644 index 00000000..ed224de3 --- /dev/null +++ b/doc/modules/ROOT/pages/cancellation.adoc @@ -0,0 +1,57 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + += Cancellation + +Requests may take a very long time. If the server is down, they may suspend forever, +waiting for the server to be up. Fortunately, requests can be cancelled after +a certain time using asio::cancel_after: + +``` +request req; +// ... + +co_await conn.async_exec(req, resp, asio::cancel_after(10s)); +``` + +If the request hasn't been responded after 10 seconds, it will +fail with `asio::error::operation_aborted`. With the coroutine +usage above, this means a `boost::system::system_error` exception +with the error code mentioned above. + +== Retrying idempotent requests + +By default, the library waits until the server is up, +and then sends the request. But what happens if there is a communication +error after sending the request, but before receiving a response? + +In this situation, we don't know if the request was processed by the server or not. +And we have no way to know it. By default, the library mark these requests as +failed with `asio::error::operation_aborted`. (TODO: do we want another error code here?). + +Some requests can be executed several times and result in the same outcome +as executing them only once. We say that these requests are idempotent. +The `SET` command is idempotent, while `INCR` is not. + +If you know that a request contains only idempotent commands, +you can instruct Redis to retry the request on failure, even +if the library is unsure about whether the server received the request or not. +You can do so by setting request::config::cancel_if_unresponded to false: + +``` +request req; +req.push("SET", "my_key", 42); // idempotent +req.get_config().cancel_on_connection_lost = false; // TODO: we shouldn't need this +req.get_config().cancel_if_unresponded = false; // retry + +// Makes sure that the key is set, even in the presence of network errors. +// Note that if the server is down, the current coroutine will remain suspended +// until the server is capable of serving requests again (e.g. until a process manager restarts the server). +// Use cancel_after as seen above if you need to limit this time. +co_await conn.async_exec(req, ignore); +``` diff --git a/doc/modules/ROOT/pages/index.adoc b/doc/modules/ROOT/pages/index.adoc index 6d87beb7..fe9f04f1 100644 --- a/doc/modules/ROOT/pages/index.adoc +++ b/doc/modules/ROOT/pages/index.adoc @@ -139,6 +139,7 @@ receiver(std::shared_ptr conn) -> net::awaitable Here is a list of topics that you might be interested in: +* xref:cancellation.adoc[Setting timeouts to requests and managing cancellation]. * xref:requests_responses.adoc[More on requests and responses]. * xref:serialization.adoc[Serializing and parsing into custom types]. * xref:logging.adoc[Configuring logging]. diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 8f8bdd04..c9b18235 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -27,6 +27,7 @@ make_testable_example(cpp20_intro 20) make_testable_example(cpp20_containers 20) make_testable_example(cpp20_json 20) make_testable_example(cpp20_unix_sockets 20) +make_testable_example(cpp20_timeouts 20) make_example(cpp20_subscriber 20) make_example(cpp20_streams 20) diff --git a/example/cpp20_timeouts.cpp b/example/cpp20_timeouts.cpp new file mode 100644 index 00000000..98168339 --- /dev/null +++ b/example/cpp20_timeouts.cpp @@ -0,0 +1,49 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include + +#include +#include +#include +#include + +#include + +#if defined(BOOST_ASIO_HAS_CO_AWAIT) + +namespace asio = boost::asio; +using boost::redis::request; +using boost::redis::response; +using boost::redis::config; +using boost::redis::connection; +using namespace std::chrono_literals; + +// Called from the main function (see main.cpp) +auto co_main(config cfg) -> asio::awaitable +{ + auto conn = std::make_shared(co_await asio::this_coro::executor); + conn->async_run(cfg, asio::consign(asio::detached, conn)); + + // A request containing only a ping command. + request req; + req.push("PING", "Hello world"); + + // Response where the PONG response will be stored. + response resp; + + // Executes the request with a timeout. If the server is down, + // async_exec will wait until it's back again, so it, + // may suspend for a long time. + // For this reason, it's good practice to set a timeout to requests with cancel_after. + // If the request hasn't completed after 10 seconds, an exception will be thrown. + co_await conn->async_exec(req, resp, asio::cancel_after(10s)); + conn->cancel(); + + std::cout << "PING: " << std::get<0>(resp).value() << std::endl; +} + +#endif // defined(BOOST_ASIO_HAS_CO_AWAIT) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 5695166d..ad976557 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -789,26 +789,10 @@ class basic_connection { template > auto async_run(config const& cfg, CompletionToken&& token = {}) { - impl_->cfg_ = cfg; - impl_->mpx_.set_config(cfg); - - // If the token's slot has cancellation enabled, it should just emit - // the cancellation signal in our connection. This lets us unify the cancel() - // function and per-operation cancellation - auto slot = asio::get_associated_cancellation_slot(token); - if (slot.is_connected()) { - slot.template emplace>(*impl_); - } - - // Overwrite the token's cancellation slot: the composed operation - // should use the signal's slot so we can generate cancellations in cancel() - auto token_with_slot = asio::bind_cancellation_slot( - impl_->run_signal_.slot(), - std::forward(token)); - return asio::async_compose( - detail::run_op{impl_.get()}, - token_with_slot, - impl_->writer_timer_); + return asio::async_initiate( + run_initiation{impl_.get()}, + token, + &cfg); } /** @@ -1133,6 +1117,42 @@ class basic_connection { impl_->logger_.reset(detail::make_stderr_logger(lvl, cfg.log_prefix)); } + // Initiation for async_run. This is required because we need access + // to the final handler (rather than the completion token) within the initiation, + // to modify the handler's cancellation slot. + struct run_initiation { + detail::connection_impl* self; + + using executor_type = Executor; + executor_type get_executor() const noexcept { return self->get_executor(); } + + template + void operator()(Handler&& handler, config const* cfg) + { + self->cfg_ = *cfg; + self->mpx_.set_config(*cfg); + + // If the token's slot has cancellation enabled, it should just emit + // the cancellation signal in our connection. This lets us unify the cancel() + // function and per-operation cancellation + auto slot = asio::get_associated_cancellation_slot(handler); + if (slot.is_connected()) { + slot.template emplace>(*self); + } + + // Overwrite the token's cancellation slot: the composed operation + // should use the signal's slot so we can generate cancellations in cancel() + auto token_with_slot = asio::bind_cancellation_slot( + self->run_signal_.slot(), + std::forward(handler)); + + asio::async_compose( + detail::run_op{self}, + token_with_slot, + self->writer_timer_); + } + }; + friend class connection; std::unique_ptr> impl_; @@ -1224,11 +1244,8 @@ class connection { auto async_run(config const& cfg, CompletionToken&& token = {}) { return asio::async_initiate( - [](auto handler, connection* self, config const* cfg) { - self->async_run_impl(*cfg, std::move(handler)); - }, + initiation{this}, token, - this, &cfg); } @@ -1256,11 +1273,8 @@ class connection { auto async_run(config const& cfg, logger l, CompletionToken&& token = {}) { return asio::async_initiate( - [](auto handler, connection* self, config const* cfg, logger l) { - self->async_run_impl(*cfg, std::move(l), std::move(handler)); - }, + initiation{this}, token, - this, &cfg, std::move(l)); } @@ -1299,11 +1313,8 @@ class connection { auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token = {}) { return asio::async_initiate( - [](auto handler, connection* self, request const* req, any_adapter&& adapter) { - self->async_exec_impl(*req, std::move(adapter), std::move(handler)); - }, + initiation{this}, token, - this, &req, std::move(adapter)); } @@ -1358,6 +1369,34 @@ class connection { } private: + // Function object to initiate the async ops that use asio::any_completion_handler. + // Required for asio::cancel_after to work. + // Since all ops have different arguments, a single struct with different overloads is enough. + struct initiation { + connection* self; + + using executor_type = asio::any_io_executor; + executor_type get_executor() const noexcept { return self->get_executor(); } + + template + void operator()(Handler&& handler, config const* cfg, logger l) + { + self->async_run_impl(*cfg, std::move(l), std::forward(handler)); + } + + template + void operator()(Handler&& handler, config const* cfg) + { + self->async_run_impl(*cfg, std::forward(handler)); + } + + template + void operator()(Handler&& handler, request const* req, any_adapter&& adapter) + { + self->async_exec_impl(*req, std::move(adapter), std::forward(handler)); + } + }; + void async_run_impl( config const& cfg, logger&& l, diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d21279f3..01b49824 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -64,6 +64,7 @@ make_test(test_issue_50) make_test(test_conversions) make_test(test_conn_tls) make_test(test_unix_sockets) +make_test(test_conn_cancel_after) # Coverage set( diff --git a/test/test_conn_cancel_after.cpp b/test/test_conn_cancel_after.cpp new file mode 100644 index 00000000..dd25a4a5 --- /dev/null +++ b/test/test_conn_cancel_after.cpp @@ -0,0 +1,110 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "common.hpp" + +using namespace std::chrono_literals; +namespace asio = boost::asio; +using boost::system::error_code; +using boost::redis::request; +using boost::redis::basic_connection; +using boost::redis::connection; +using boost::redis::ignore; +using boost::redis::generic_response; + +namespace { + +template +void test_run() +{ + // Setup + asio::io_context ioc; + Connection conn{ioc}; + bool run_finished = false; + + // Call the function with a very short timeout + conn.async_run(make_test_config(), asio::cancel_after(1ms, [&](error_code ec) { + BOOST_TEST_EQ(ec, asio::error::operation_aborted); + run_finished = true; + })); + + ioc.run_for(test_timeout); + + BOOST_TEST(run_finished); +} + +template +void test_exec() +{ + // Setup + asio::io_context ioc; + Connection conn{ioc}; + bool exec_finished = false; + + request req; + req.push("PING", "cancel_after"); + + // Call the function with a very short timeout. + // The connection is not being run, so these can't succeed + conn.async_exec(req, ignore, asio::cancel_after(1ms, [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, asio::error::operation_aborted); + exec_finished = true; + })); + + ioc.run_for(test_timeout); + + BOOST_TEST(exec_finished); +} + +template +void test_receive() +{ + // Setup + asio::io_context ioc; + Connection conn{ioc}; + bool receive_finished = false; + generic_response resp; + conn.set_receive_response(resp); + + // Call the function with a very short timeout. + conn.async_receive(asio::cancel_after(1ms, [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, asio::experimental::channel_errc::channel_cancelled); + receive_finished = true; + })); + + ioc.run_for(test_timeout); + + BOOST_TEST(receive_finished); +} + +} // namespace + +int main() +{ + test_run>(); + test_run(); + + test_exec>(); + test_exec(); + + test_receive>(); + test_receive(); + + return boost::report_errors(); +} diff --git a/test/test_conn_run_cancel.cpp b/test/test_conn_run_cancel.cpp index 769709c4..053c58f5 100644 --- a/test/test_conn_run_cancel.cpp +++ b/test/test_conn_run_cancel.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include "common.hpp" @@ -27,13 +28,14 @@ using namespace boost::redis; namespace { // Terminal and partial cancellation work for async_run +template void test_per_operation_cancellation(std::string_view name, net::cancellation_type_t cancel_type) { std::cerr << "Running test case: " << name << std::endl; // Setup net::io_context ioc; - connection conn{ioc}; + Connection conn{ioc}; net::cancellation_signal sig; request req; @@ -66,8 +68,21 @@ void test_per_operation_cancellation(std::string_view name, net::cancellation_ty int main() { - test_per_operation_cancellation("terminal", net::cancellation_type_t::terminal); - test_per_operation_cancellation("partial", net::cancellation_type_t::partial); + using basic_connection_t = basic_connection; + + test_per_operation_cancellation( + "basic_connection, terminal", + net::cancellation_type_t::terminal); + test_per_operation_cancellation( + "basic_connection, partial", + net::cancellation_type_t::partial); + + test_per_operation_cancellation( + "connection, terminal", + net::cancellation_type_t::terminal); + test_per_operation_cancellation( + "connection, partial", + net::cancellation_type_t::partial); return boost::report_errors(); }