Skip to content
1 change: 1 addition & 0 deletions doc/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
* xref:index.adoc[Introduction]
* xref:requests_responses.adoc[]
* xref:cancellation.adoc[]
* xref:serialization.adoc[]
* xref:logging.adoc[]
* xref:benchmarks.adoc[]
Expand Down
57 changes: 57 additions & 0 deletions doc/modules/ROOT/pages/cancellation.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

= Cancellation

Requests may take a very long time. If the server is down, they may suspend forever,
waiting for the server to be up. Fortunately, requests can be cancelled after
a certain time using asio::cancel_after:

```
request req;
// ...

co_await conn.async_exec(req, resp, asio::cancel_after(10s));
```

If the request hasn't been responded after 10 seconds, it will
fail with `asio::error::operation_aborted`. With the coroutine
usage above, this means a `boost::system::system_error` exception
with the error code mentioned above.

== Retrying idempotent requests

By default, the library waits until the server is up,
and then sends the request. But what happens if there is a communication
error after sending the request, but before receiving a response?

In this situation, we don't know if the request was processed by the server or not.
And we have no way to know it. By default, the library mark these requests as
failed with `asio::error::operation_aborted`. (TODO: do we want another error code here?).

Some requests can be executed several times and result in the same outcome
as executing them only once. We say that these requests are idempotent.
The `SET` command is idempotent, while `INCR` is not.

If you know that a request contains only idempotent commands,
you can instruct Redis to retry the request on failure, even
if the library is unsure about whether the server received the request or not.
You can do so by setting request::config::cancel_if_unresponded to false:

```
request req;
req.push("SET", "my_key", 42); // idempotent
req.get_config().cancel_on_connection_lost = false; // TODO: we shouldn't need this
req.get_config().cancel_if_unresponded = false; // retry

// Makes sure that the key is set, even in the presence of network errors.
// Note that if the server is down, the current coroutine will remain suspended
// until the server is capable of serving requests again (e.g. until a process manager restarts the server).
// Use cancel_after as seen above if you need to limit this time.
co_await conn.async_exec(req, ignore);
```
1 change: 1 addition & 0 deletions doc/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>

Here is a list of topics that you might be interested in:

* xref:cancellation.adoc[Setting timeouts to requests and managing cancellation].
* xref:requests_responses.adoc[More on requests and responses].
* xref:serialization.adoc[Serializing and parsing into custom types].
* xref:logging.adoc[Configuring logging].
Expand Down
1 change: 1 addition & 0 deletions example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ make_testable_example(cpp20_intro 20)
make_testable_example(cpp20_containers 20)
make_testable_example(cpp20_json 20)
make_testable_example(cpp20_unix_sockets 20)
make_testable_example(cpp20_timeouts 20)

make_example(cpp20_subscriber 20)
make_example(cpp20_streams 20)
Expand Down
49 changes: 49 additions & 0 deletions example/cpp20_timeouts.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#include <boost/redis/connection.hpp>

#include <boost/asio/cancel_after.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/detached.hpp>

#include <iostream>

#if defined(BOOST_ASIO_HAS_CO_AWAIT)

namespace asio = boost::asio;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::connection;
using namespace std::chrono_literals;

// Called from the main function (see main.cpp)
auto co_main(config cfg) -> asio::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await asio::this_coro::executor);
conn->async_run(cfg, asio::consign(asio::detached, conn));

// A request containing only a ping command.
request req;
req.push("PING", "Hello world");

// Response where the PONG response will be stored.
response<std::string> resp;

// Executes the request with a timeout. If the server is down,
// async_exec will wait until it's back again, so it,
// may suspend for a long time.
// For this reason, it's good practice to set a timeout to requests with cancel_after.
// If the request hasn't completed after 10 seconds, an exception will be thrown.
co_await conn->async_exec(req, resp, asio::cancel_after(10s));
conn->cancel();

std::cout << "PING: " << std::get<0>(resp).value() << std::endl;
}

#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)
103 changes: 71 additions & 32 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,26 +789,10 @@ class basic_connection {
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
auto async_run(config const& cfg, CompletionToken&& token = {})
{
impl_->cfg_ = cfg;
impl_->mpx_.set_config(cfg);

// If the token's slot has cancellation enabled, it should just emit
// the cancellation signal in our connection. This lets us unify the cancel()
// function and per-operation cancellation
auto slot = asio::get_associated_cancellation_slot(token);
if (slot.is_connected()) {
slot.template emplace<detail::run_cancel_handler<Executor>>(*impl_);
}

// Overwrite the token's cancellation slot: the composed operation
// should use the signal's slot so we can generate cancellations in cancel()
auto token_with_slot = asio::bind_cancellation_slot(
impl_->run_signal_.slot(),
std::forward<CompletionToken>(token));
return asio::async_compose<decltype(token_with_slot), void(system::error_code)>(
detail::run_op<Executor>{impl_.get()},
token_with_slot,
impl_->writer_timer_);
return asio::async_initiate<CompletionToken, void(system::error_code)>(
run_initiation{impl_.get()},
token,
&cfg);
}

/**
Expand Down Expand Up @@ -1133,6 +1117,42 @@ class basic_connection {
impl_->logger_.reset(detail::make_stderr_logger(lvl, cfg.log_prefix));
}

// Initiation for async_run. This is required because we need access
// to the final handler (rather than the completion token) within the initiation,
// to modify the handler's cancellation slot.
struct run_initiation {
detail::connection_impl<Executor>* self;

using executor_type = Executor;
executor_type get_executor() const noexcept { return self->get_executor(); }

template <class Handler>
void operator()(Handler&& handler, config const* cfg)
{
self->cfg_ = *cfg;
self->mpx_.set_config(*cfg);

// If the token's slot has cancellation enabled, it should just emit
// the cancellation signal in our connection. This lets us unify the cancel()
// function and per-operation cancellation
auto slot = asio::get_associated_cancellation_slot(handler);
if (slot.is_connected()) {
slot.template emplace<detail::run_cancel_handler<Executor>>(*self);
}

// Overwrite the token's cancellation slot: the composed operation
// should use the signal's slot so we can generate cancellations in cancel()
auto token_with_slot = asio::bind_cancellation_slot(
self->run_signal_.slot(),
std::forward<Handler>(handler));

asio::async_compose<decltype(token_with_slot), void(system::error_code)>(
detail::run_op<Executor>{self},
token_with_slot,
self->writer_timer_);
}
};

friend class connection;

std::unique_ptr<detail::connection_impl<Executor>> impl_;
Expand Down Expand Up @@ -1224,11 +1244,8 @@ class connection {
auto async_run(config const& cfg, CompletionToken&& token = {})
{
return asio::async_initiate<CompletionToken, void(boost::system::error_code)>(
[](auto handler, connection* self, config const* cfg) {
self->async_run_impl(*cfg, std::move(handler));
},
initiation{this},
token,
this,
&cfg);
}

Expand Down Expand Up @@ -1256,11 +1273,8 @@ class connection {
auto async_run(config const& cfg, logger l, CompletionToken&& token = {})
{
return asio::async_initiate<CompletionToken, void(boost::system::error_code)>(
[](auto handler, connection* self, config const* cfg, logger l) {
self->async_run_impl(*cfg, std::move(l), std::move(handler));
},
initiation{this},
token,
this,
&cfg,
std::move(l));
}
Expand Down Expand Up @@ -1299,11 +1313,8 @@ class connection {
auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token = {})
{
return asio::async_initiate<CompletionToken, void(boost::system::error_code, std::size_t)>(
[](auto handler, connection* self, request const* req, any_adapter&& adapter) {
self->async_exec_impl(*req, std::move(adapter), std::move(handler));
},
initiation{this},
token,
this,
&req,
std::move(adapter));
}
Expand Down Expand Up @@ -1358,6 +1369,34 @@ class connection {
}

private:
// Function object to initiate the async ops that use asio::any_completion_handler.
// Required for asio::cancel_after to work.
// Since all ops have different arguments, a single struct with different overloads is enough.
struct initiation {
connection* self;

using executor_type = asio::any_io_executor;
executor_type get_executor() const noexcept { return self->get_executor(); }

template <class Handler>
void operator()(Handler&& handler, config const* cfg, logger l)
{
self->async_run_impl(*cfg, std::move(l), std::forward<Handler>(handler));
}

template <class Handler>
void operator()(Handler&& handler, config const* cfg)
{
self->async_run_impl(*cfg, std::forward<Handler>(handler));
}

template <class Handler>
void operator()(Handler&& handler, request const* req, any_adapter&& adapter)
{
self->async_exec_impl(*req, std::move(adapter), std::forward<Handler>(handler));
}
};

void async_run_impl(
config const& cfg,
logger&& l,
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ make_test(test_issue_50)
make_test(test_conversions)
make_test(test_conn_tls)
make_test(test_unix_sockets)
make_test(test_conn_cancel_after)

# Coverage
set(
Expand Down
110 changes: 110 additions & 0 deletions test/test_conn_cancel_after.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/redis/connection.hpp>
#include <boost/redis/ignore.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>

#include <boost/asio/cancel_after.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/experimental/channel_error.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/core/lightweight_test.hpp>

#include "common.hpp"

using namespace std::chrono_literals;
namespace asio = boost::asio;
using boost::system::error_code;
using boost::redis::request;
using boost::redis::basic_connection;
using boost::redis::connection;
using boost::redis::ignore;
using boost::redis::generic_response;

namespace {

template <class Connection>
void test_run()
{
// Setup
asio::io_context ioc;
Connection conn{ioc};
bool run_finished = false;

// Call the function with a very short timeout
conn.async_run(make_test_config(), asio::cancel_after(1ms, [&](error_code ec) {
BOOST_TEST_EQ(ec, asio::error::operation_aborted);
run_finished = true;
}));

ioc.run_for(test_timeout);

BOOST_TEST(run_finished);
}

template <class Connection>
void test_exec()
{
// Setup
asio::io_context ioc;
Connection conn{ioc};
bool exec_finished = false;

request req;
req.push("PING", "cancel_after");

// Call the function with a very short timeout.
// The connection is not being run, so these can't succeed
conn.async_exec(req, ignore, asio::cancel_after(1ms, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, asio::error::operation_aborted);
exec_finished = true;
}));

ioc.run_for(test_timeout);

BOOST_TEST(exec_finished);
}

template <class Connection>
void test_receive()
{
// Setup
asio::io_context ioc;
Connection conn{ioc};
bool receive_finished = false;
generic_response resp;
conn.set_receive_response(resp);

// Call the function with a very short timeout.
conn.async_receive(asio::cancel_after(1ms, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, asio::experimental::channel_errc::channel_cancelled);
receive_finished = true;
}));

ioc.run_for(test_timeout);

BOOST_TEST(receive_finished);
}

} // namespace

int main()
{
test_run<basic_connection<asio::io_context::executor_type>>();
test_run<connection>();

test_exec<basic_connection<asio::io_context::executor_type>>();
test_exec<connection>();

test_receive<basic_connection<asio::io_context::executor_type>>();
test_receive<connection>();

return boost::report_errors();
}
Loading