diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index cd804481..3d38b255 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -28,10 +28,14 @@ #include #include +#include #include +#include #include #include #include +#include +#include #include #include #include @@ -45,6 +49,7 @@ #include #include #include +#include #include #include @@ -80,6 +85,7 @@ struct connection_impl { timer_type reconnect_timer_; // to wait the reconnection period timer_type ping_timer_; // to wait between pings receive_channel_type receive_channel_; + asio::cancellation_signal run_signal_; config cfg_; multiplexer mpx_; @@ -142,21 +148,21 @@ struct connection_impl { void cancel(operation op) { switch (op) { - case operation::resolve: stream_.cancel_resolve(); break; case operation::exec: mpx_.cancel_waiting(); break; + case operation::receive: receive_channel_.cancel(); break; case operation::reconnection: cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); break; - case operation::run: cancel_run(); break; - case operation::receive: receive_channel_.cancel(); break; - case operation::health_check: ping_timer_.cancel(); break; + case operation::run: + case operation::resolve: + case operation::connect: + case operation::ssl_handshake: + case operation::health_check: cancel_run(); break; case operation::all: - stream_.cancel_resolve(); - cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); - ping_timer_.cancel(); - cancel_run(); // run - receive_channel_.cancel(); // receive - mpx_.cancel_waiting(); // exec + mpx_.cancel_waiting(); // exec + receive_channel_.cancel(); // receive + cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect + cancel_run(); // run break; default: /* ignore */; } @@ -164,8 +170,16 @@ struct connection_impl { void cancel_run() { - stream_.close(); - writer_timer_.cancel(); + // Individual operations should see a terminal cancellation, regardless + // of what we got requested. We take enough actions to ensure that this + // doesn't prevent the object from being re-used (e.g. we reset the TLS stream). + run_signal_.emit(asio::cancellation_type_t::terminal); + + // Name resolution doesn't support per-operation cancellation + stream_.cancel_resolve(); + + // Receive is technically not part of run, but we also cancel it for + // backwards compatibility. receive_channel_.cancel(); } @@ -218,31 +232,28 @@ struct writer_op { if (ec) { conn_->logger_.trace("writer_op (1)", ec); - conn_->cancel(operation::run); self.complete(ec); return; } conn_->mpx_.commit_write(); - // A socket.close() may have been called while a - // successful write might had already been queued, so we - // have to check here before proceeding. - if (!conn_->is_open()) { - conn_->logger_.trace("writer_op (2): connection is closed."); - self.complete({}); + // Check for cancellations + if (is_cancelled(self)) { + conn_->logger_.trace("writer_op (2): cancelled"); + self.complete(asio::error::operation_aborted); return; } } + // Wait for data to be available BOOST_ASIO_CORO_YIELD conn_->writer_timer_.async_wait(std::move(self)); - if (!conn_->is_open()) { - conn_->logger_.trace("writer_op (3): connection is closed."); - // Notice this is not an error of the op, stoping was - // requested from the outside, so we complete with - // success. - self.complete({}); + + // Check for cancellations + if (is_cancelled(self)) { + conn_->logger_.trace("writer_op (3): cancelled"); + self.complete(asio::error::operation_aborted); return; } } @@ -269,9 +280,6 @@ struct reader_op { conn_->logger_.on_fsm_resume(act); switch (act.type_) { - case reader_fsm::action::type::setup_cancellation: - self.reset_cancellation_state(asio::enable_terminal_cancellation()); - continue; case reader_fsm::action::type::needs_more: case reader_fsm::action::type::read_some: { @@ -287,8 +295,7 @@ struct reader_op { return; } return; - case reader_fsm::action::type::cancel_run: conn_->cancel(operation::run); continue; - case reader_fsm::action::type::done: self.complete(act.ec_); return; + case reader_fsm::action::type::done: self.complete(act.ec_); return; } } } @@ -299,12 +306,18 @@ struct health_checker_op { connection_impl* conn_; asio::coroutine coro_{}; - system::error_code check_errors(system::error_code io_ec) + system::error_code check_errors(system::error_code io_ec, asio::cancellation_type_t cancel_state) { - // Did we have a timeout? + // Did we have a cancellation? We might not have an error code here + if ((cancel_state & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none) { + conn_->logger_.log(logger::level::info, "Health checker: cancelled"); + return asio::error::operation_aborted; + } + + // operation_aborted and no cancel state means that asio::cancel_after timed out if (io_ec == asio::error::operation_aborted) { conn_->logger_.log(logger::level::info, "Health checker: ping timed out"); - return asio::error::operation_aborted; + return error::pong_timeout; } // Did we have other unknown error? @@ -339,8 +352,11 @@ struct health_checker_op { { if (conn_->cfg_.health_check_interval == std::chrono::seconds::zero()) { conn_->logger_.trace("ping_op (1): timeout disabled."); - BOOST_ASIO_CORO_YIELD asio::async_immediate(self.get_io_executor(), std::move(self)); - self.complete(system::error_code{}); + + // Wait until we're cancelled. This simplifies parallel group handling a lot + conn_->ping_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); + BOOST_ASIO_CORO_YIELD conn_->ping_timer_.async_wait(std::move(self)); + self.complete(asio::error::operation_aborted); return; } @@ -359,15 +375,8 @@ struct health_checker_op { asio::cancel_after(conn->ping_timer_, timeout, std::move(self))); } - // Check for cancellations - if (is_cancelled(self)) { - conn_->logger_.trace("ping_op (2): cancelled"); - self.complete(asio::error::operation_aborted); - return; - } - - // Check for errors in PING - ec = check_errors(ec); + // Check for cancellations and errors in PING + ec = check_errors(ec, self.get_cancellation_state().cancelled()); if (ec) { self.complete(ec); return; @@ -378,14 +387,9 @@ struct health_checker_op { BOOST_ASIO_CORO_YIELD conn_->ping_timer_.async_wait(std::move(self)); - if (ec) { - conn_->logger_.trace("ping_op (3)", ec); - self.complete(ec); - return; - } if (is_cancelled(self)) { - conn_->logger_.trace("ping_op (4): cancelled"); + conn_->logger_.trace("ping_op (2): cancelled"); self.complete(asio::error::operation_aborted); return; } @@ -412,6 +416,13 @@ inline system::error_code check_config(const config& cfg) return system::error_code{}; } +system::error_code translate_parallel_group_errors( + std::array order, + system::error_code setup_ec, + system::error_code health_check_ec, + system::error_code reader_ec, + system::error_code writer_ec); + template class run_op { private: @@ -419,17 +430,12 @@ class run_op { asio::coroutine coro_{}; system::error_code stored_ec_; - using order_t = std::array; - static system::error_code on_setup_finished( connection_impl& conn, system::error_code ec) { ec = check_setup_response(ec, conn.setup_resp_); conn.logger_.on_setup(ec, conn.setup_resp_); - if (ec) { - conn.cancel(operation::run); - } return ec; } @@ -490,29 +496,17 @@ class run_op { template void operator()( Self& self, - order_t order, + std::array order, system::error_code setup_ec, system::error_code health_check_ec, system::error_code reader_ec, - system::error_code /* writer_ec */) + system::error_code writer_ec) { - system::error_code final_ec; - - if (order[0] == 0 && !!setup_ec) { - // The setup op finished first and with an error - final_ec = setup_ec; - } else if (order[0] == 1 && health_check_ec == error::pong_timeout) { - // The check ping timeout finished first. Use the ping error code - final_ec = health_check_ec; - } else { - // Use the reader error code - final_ec = reader_ec; - } - - (*this)(self, final_ec); + (*this)( + self, + translate_parallel_group_errors(order, setup_ec, health_check_ec, reader_ec, writer_ec)); } - // TODO: this op doesn't handle per-operation cancellation correctly template void operator()(Self& self, system::error_code ec = {}) { @@ -539,6 +533,12 @@ class run_op { BOOST_ASIO_CORO_YIELD conn_->stream_.async_connect(&conn_->cfg_, &conn_->logger_, std::move(self)); + // Check for cancellations + if (is_cancelled(self)) { + self.complete(asio::error::operation_aborted); + return; + } + // If we were successful, run all the connection tasks if (!ec) { conn_->mpx_.reset(); @@ -574,7 +574,7 @@ class run_op { // The receive operation must be cancelled because channel // subscription does not survive a reconnection but requires // re-subscription. - conn_->cancel(operation::receive); + conn_->receive_channel_.cancel(); } // If we are not going to try again, we're done @@ -583,14 +583,20 @@ class run_op { return; } + // Check for cancellations + if (is_cancelled(self)) { + self.complete(asio::error::operation_aborted); + return; + } + // Wait for the reconnection interval conn_->reconnect_timer_.expires_after(conn_->cfg_.reconnect_wait_interval); BOOST_ASIO_CORO_YIELD conn_->reconnect_timer_.async_wait(std::move(self)); - // If the timer was cancelled, exit - if (ec) { - self.complete(ec); + // Check for cancellations + if (is_cancelled(self)) { + self.complete(asio::error::operation_aborted); return; } @@ -606,6 +612,27 @@ class run_op { logger make_stderr_logger(logger::level lvl, std::string prefix); +template +class run_cancel_handler { + connection_impl* conn_; + +public: + explicit run_cancel_handler(connection_impl& conn) noexcept + : conn_(&conn) + { } + + void operator()(asio::cancellation_type_t cancel_type) const + { + // We support terminal and partial cancellation + constexpr auto mask = asio::cancellation_type_t::terminal | + asio::cancellation_type_t::partial; + + if ((cancel_type & mask) != asio::cancellation_type_t::none) { + conn_->cancel(operation::run); + } + } +}; + } // namespace detail /** @brief A SSL connection to the Redis server. @@ -730,9 +757,8 @@ class basic_connection { * before `async_run` is called will be written to the server immediately. * * When a connection is lost for any reason, a new one is - * established automatically. To disable reconnection call - * `boost::redis::connection::cancel(operation::reconnection)` - * or set @ref boost::redis::config::reconnect_wait_interval to zero. + * established automatically. To disable reconnection + * set @ref boost::redis::config::reconnect_wait_interval to zero. * * The completion token must have the following signature * @@ -740,11 +766,25 @@ class basic_connection { * void f(system::error_code); * @endcode * - * For example on how to call this function refer to - * cpp20_intro.cpp or any other example. + * @par Per-operation cancellation + * This operation supports the following cancellation types: * - * @param cfg Configuration parameters. - * @param token Completion token. + * @li `asio::cancellation_type_t::terminal`. + * @li `asio::cancellation_type_t::partial`. + * + * In both cases, cancellation is equivalent to calling @ref basic_connection::cancel + * passing @ref operation::run as argument. + * + * After the operation completes, the token's associated cancellation slot + * may still have a cancellation handler associated to this connection. + * You should make sure to not invoke it after the connection has been destroyed. + * This is consistent with what other Asio I/O objects do. + * + * For example on how to call this function refer to + * cpp20_intro.cpp or any other example. + * + * @param cfg Configuration parameters. + * @param token Completion token. */ template > auto async_run(config const& cfg, CompletionToken&& token = {}) @@ -752,9 +792,22 @@ class basic_connection { impl_->cfg_ = cfg; impl_->mpx_.set_config(cfg); - return asio::async_compose( + // If the token's slot has cancellation enabled, it should just emit + // the cancellation signal in our connection. This lets us unify the cancel() + // function and per-operation cancellation + auto slot = asio::get_associated_cancellation_slot(token); + if (slot.is_connected()) { + slot.template emplace>(*impl_); + } + + // Overwrite the token's cancellation slot: the composed operation + // should use the signal's slot so we can generate cancellations in cancel() + auto token_with_slot = asio::bind_cancellation_slot( + impl_->run_signal_.slot(), + std::forward(token)); + return asio::async_compose( detail::run_op{impl_.get()}, - token, + token_with_slot, impl_->writer_timer_); } @@ -811,25 +864,32 @@ class basic_connection { /** @brief Receives server side pushes asynchronously. * - * When pushes arrive and there is no `async_receive` operation in - * progress, pushed data, requests, and responses will be paused - * until `async_receive` is called again. Apps will usually want - * to call `async_receive` in a loop. + * When pushes arrive and there is no `async_receive` operation in + * progress, pushed data, requests, and responses will be paused + * until `async_receive` is called again. Apps will usually want + * to call `async_receive` in a loop. * - * To cancel an ongoing receive operation apps should call - * `basic_connection::cancel(operation::receive)`. + * For an example see cpp20_subscriber.cpp. The completion token must + * have the following signature * - * For an example see cpp20_subscriber.cpp. The completion token must - * have the following signature + * @code + * void f(system::error_code, std::size_t); + * @endcode * - * @code - * void f(system::error_code, std::size_t); - * @endcode + * Where the second parameter is the size of the push received in + * bytes. + * + * @par Per-operation cancellation + * This operation supports the following cancellation types: * - * Where the second parameter is the size of the push received in - * bytes. + * @li `asio::cancellation_type_t::terminal`. + * @li `asio::cancellation_type_t::partial`. + * @li `asio::cancellation_type_t::total`. + * + * Calling `basic_connection::cancel(operation::receive)` will + * also cancel any ongoing receive operations. * - * @param token Completion token. + * @param token Completion token. */ template > auto async_receive(CompletionToken&& token = {}) @@ -837,7 +897,7 @@ class basic_connection { return impl_->receive_channel_.async_receive(std::forward(token)); } - /** @brief Receives server> pushes synchronously without blocking. + /** @brief Receives server pushes synchronously without blocking. * * Receives a server push synchronously by calling `try_receive` on * the underlying channel. If the operation fails because diff --git a/include/boost/redis/detail/helper.hpp b/include/boost/redis/detail/helper.hpp index d6c2481a..58a7fe97 100644 --- a/include/boost/redis/detail/helper.hpp +++ b/include/boost/redis/detail/helper.hpp @@ -17,18 +17,6 @@ auto is_cancelled(T const& self) return self.get_cancellation_state().cancelled() != asio::cancellation_type_t::none; } -#define BOOST_REDIS_CHECK_OP0(X) \ - if (ec || redis::detail::is_cancelled(self)) { \ - X self.complete(!!ec ? ec : asio::error::operation_aborted); \ - return; \ - } - -#define BOOST_REDIS_CHECK_OP1(X) \ - if (ec || redis::detail::is_cancelled(self)) { \ - X self.complete(!!ec ? ec : asio::error::operation_aborted, {}); \ - return; \ - } - } // namespace boost::redis::detail #endif // BOOST_REDIS_HELPER_HPP diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index 57091ce5..ed6b6ece 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -22,15 +22,13 @@ class reader_fsm { struct action { enum class type { - setup_cancellation, read_some, needs_more, notify_push_receiver, - cancel_run, done, }; - type type_ = type::setup_cancellation; + type type_ = type::done; std::size_t push_size_ = 0u; system::error_code ec_ = {}; }; @@ -40,11 +38,10 @@ class reader_fsm { action resume( std::size_t bytes_read, system::error_code ec, - asio::cancellation_type_t /*cancel_state*/); + asio::cancellation_type_t cancel_state); private: int resume_point_{0}; - action action_after_resume_; action::type next_read_type_ = action::type::read_some; multiplexer* mpx_ = nullptr; std::pair res_{consume_result::needs_more, 0u}; diff --git a/include/boost/redis/detail/redis_stream.hpp b/include/boost/redis/detail/redis_stream.hpp index e8590c70..670fe114 100644 --- a/include/boost/redis/detail/redis_stream.hpp +++ b/include/boost/redis/detail/redis_stream.hpp @@ -99,6 +99,9 @@ class redis_stream { if (obj.transport_ == transport_type::unix_socket) { #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS + // Discard any existing state + obj.unix_socket_.close(ec); + // Directly connect to the socket BOOST_ASIO_CORO_YIELD obj.unix_socket_.async_connect( @@ -119,7 +122,9 @@ class redis_stream { } else { // ssl::stream doesn't support being re-used. If we're to use // TLS and the stream has been used, re-create it. - // Must be done before anything else is done on the stream + // Must be done before anything else is done on the stream. + // Note that we don't need to close the socket here because + // range connect does it for us. if (cfg->use_ssl && obj.ssl_stream_used_) obj.reset_stream(); @@ -269,19 +274,11 @@ class redis_stream { } } - // Cleanup + // Cancels resolve operations. Resolve operations don't support per-operation + // cancellation, but resolvers have a cancel() function. Resolve operations are + // in general blocking and run in a separate thread. cancel() has effect only + // if the operation hasn't started yet. Still, trying is better than nothing void cancel_resolve() { resolv_.cancel(); } - - void close() - { - system::error_code ec; - if (stream_.next_layer().is_open()) - stream_.next_layer().close(ec); -#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS - if (unix_socket_.is_open()) - unix_socket_.close(ec); -#endif - } }; } // namespace detail diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 088b1ac3..b23cc8b4 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -21,6 +21,33 @@ logger detail::make_stderr_logger(logger::level lvl, std::string prefix) }); } +system::error_code detail::translate_parallel_group_errors( + std::array order, + system::error_code setup_ec, + system::error_code health_check_ec, + system::error_code reader_ec, + system::error_code writer_ec) +{ + // The setup request is special: it might complete successfully, + // without causing the other tasks to exit. + // The other tasks will always complete with an error. + + // If the setup task errored and was the first to exit, use its code + if (order[0] == 0u && setup_ec) { + return setup_ec; + } + + // Use the code corresponding to the task that finished first, + // excluding the setup task + std::size_t task_number = order[0] == 0u ? order[1] : order[0]; + switch (task_number) { + case 1u: return health_check_ec; + case 2u: return reader_ec; + case 3u: return writer_ec; + default: BOOST_ASSERT(false); return system::error_code(); + } +} + connection::connection(executor_type ex, asio::ssl::context ctx, logger lgr) : impl_{std::move(ex), std::move(ctx), std::move(lgr)} { } diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index c5b32677..bdb72b6b 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -25,11 +25,9 @@ namespace boost::redis::detail { auto to_string(reader_fsm::action::type t) noexcept -> char const* { switch (t) { - BOOST_REDIS_READER_SWITCH_CASE(setup_cancellation); BOOST_REDIS_READER_SWITCH_CASE(read_some); BOOST_REDIS_READER_SWITCH_CASE(needs_more); BOOST_REDIS_READER_SWITCH_CASE(notify_push_receiver); - BOOST_REDIS_READER_SWITCH_CASE(cancel_run); BOOST_REDIS_READER_SWITCH_CASE(done); default: return "action::type::"; } diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 7a2475fb..aca75069 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -8,8 +8,16 @@ #include #include +#include +#include + namespace boost::redis::detail { +inline bool is_terminal_cancel(asio::cancellation_type_t value) +{ + return (value & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none; +} + reader_fsm::reader_fsm(multiplexer& mpx) noexcept : mpx_{&mpx} { } @@ -17,40 +25,46 @@ reader_fsm::reader_fsm(multiplexer& mpx) noexcept reader_fsm::action reader_fsm::resume( std::size_t bytes_read, system::error_code ec, - asio::cancellation_type_t /*cancel_state*/) + asio::cancellation_type_t cancel_state) { switch (resume_point_) { BOOST_REDIS_CORO_INITIAL - BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation) for (;;) { + // Prepare the buffer for the read operation ec = mpx_->prepare_read(); if (ec) { - action_after_resume_ = {action::type::done, 0, ec}; - BOOST_REDIS_YIELD(resume_point_, 2, action::type::cancel_run) - return action_after_resume_; + return {action::type::done, 0, ec}; } - BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_) + // Read + BOOST_REDIS_YIELD(resume_point_, 1, next_read_type_) + + // Process the bytes read, even if there was an error mpx_->commit_read(bytes_read); + + // Check for read errors if (ec) { // TODO: If an error occurred but data was read (i.e. // bytes_read != 0) we should try to process that data and // deliver it to the user before calling cancel_run. - action_after_resume_ = {action::type::done, bytes_read, ec}; - BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run) - return action_after_resume_; + return {action::type::done, bytes_read, ec}; + } + + // Check for cancellations + if (is_terminal_cancel(cancel_state)) { + return {action::type::done, 0u, asio::error::operation_aborted}; } + // Process the data that we've read next_read_type_ = action::type::read_some; while (mpx_->get_read_buffer_size() != 0) { res_ = mpx_->consume(ec); + if (ec) { // TODO: Perhaps log what has not been consumed to aid // debugging. - action_after_resume_ = {action::type::done, res_.second, ec}; - BOOST_REDIS_YIELD(resume_point_, 5, action::type::cancel_run) - return action_after_resume_; + return {action::type::done, res_.second, ec}; } if (res_.first == consume_result::needs_more) { @@ -59,11 +73,12 @@ reader_fsm::action reader_fsm::resume( } if (res_.first == consume_result::got_push) { - BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second) + BOOST_REDIS_YIELD(resume_point_, 2, action::type::notify_push_receiver, res_.second) if (ec) { - action_after_resume_ = {action::type::done, 0u, ec}; - BOOST_REDIS_YIELD(resume_point_, 7, action::type::cancel_run) - return action_after_resume_; + return {action::type::done, 0u, ec}; + } + if (is_terminal_cancel(cancel_state)) { + return {action::type::done, 0u, asio::error::operation_aborted}; } } else { // TODO: Here we should notify the exec operation that diff --git a/include/boost/redis/operation.hpp b/include/boost/redis/operation.hpp index 624d34b4..da087cc0 100644 --- a/include/boost/redis/operation.hpp +++ b/include/boost/redis/operation.hpp @@ -16,22 +16,68 @@ namespace boost::redis { */ enum class operation { - /// Resolve operation. + /** + * @brief (Deprecated) Resolve operation. + * + * Cancelling a single resolve operation is probably not what you + * want, since there is no way to detect when a connection is performing name resolution. + * Use @ref operation::run to cancel the current @ref basic_connection::async_run operation, + * which includes name resolution. + */ resolve, - /// Connect operation. + + /** + * @brief (Deprecated) Connect operation. + * + * Cancelling a single connect operation is probably not what you + * want, since there is no way to detect when a connection is performing a connect operation. + * Use @ref operation::run to cancel the current @ref basic_connection::async_run operation, + * which includes connection establishment. + */ connect, - /// SSL handshake operation. + + /** + * @brief (Deprecated) SSL handshake operation. + * + * Cancelling a single connect operation is probably not what you + * want, since there is no way to detect when a connection is performing an SSL handshake. + * Use @ref operation::run to cancel the current @ref basic_connection::async_run operation, + * which includes the SSL handshake. + */ ssl_handshake, + /// Refers to `connection::async_exec` operations. exec, + /// Refers to `connection::async_run` operations. run, + /// Refers to `connection::async_receive` operations. receive, - /// Cancels reconnection. + + /** + * @brief (Deprecated) Cancels reconnection. + * + * Cancelling reconnection doesn't really cancel anything. + * It will only prevent further connections attempts from being + * made once the current connection encounters an error. + * + * Use @ref operation::run to cancel the current @ref basic_connection::async_run operation, + * which includes reconnection. If you want to disable reconnection completely, + * set @ref config::reconnect_wait_interval to zero before calling `async_run`. + */ reconnection, - /// Health check operation. + + /** + * @brief (Deprecated) Health check operation. + * + * Cancelling the health checker only is probably not what you want. + * Use @ref operation::run to cancel the current @ref basic_connection::async_run operation, + * which includes the health checker. If you want to disable health checks completely, + * set @ref config::health_check_interval to zero before calling `async_run`. + */ health_check, + /// Refers to all operations. all, }; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9ef1791d..e5d4e24c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -49,6 +49,7 @@ make_test(test_conn_quit) make_test(test_conn_exec_retry) make_test(test_conn_exec_error) make_test(test_run) +make_test(test_conn_run_cancel) make_test(test_conn_check_health) make_test(test_conn_exec) make_test(test_conn_push) diff --git a/test/test_conn_check_health.cpp b/test/test_conn_check_health.cpp index 8be5ecf2..7314c53a 100644 --- a/test/test_conn_check_health.cpp +++ b/test/test_conn_check_health.cpp @@ -25,7 +25,8 @@ using namespace std::chrono_literals; namespace { -void test_check_health() +// The health checker detects dead connections and triggers reconnection +void test_reconnection() { // Setup net::io_context ioc; @@ -75,11 +76,91 @@ void test_check_health() BOOST_TEST(exec2_finished); } +// We use the correct error code when a ping times out +void test_error_code() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + + // This request will block forever, causing the connection to become unresponsive + request req; + req.push("BLPOP", "any", 0); + + // Make the test run faster + auto cfg = make_test_config(); + cfg.health_check_interval = 200ms; + cfg.reconnect_wait_interval = 0s; + + bool run_finished = false, exec_finished = false; + + conn.async_run(cfg, [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, boost::redis::error::pong_timeout); + }); + + // This request will complete after the health checker deems the connection + // as unresponsive and triggers a reconnection (it's configured to be cancelled + // on connection lost). + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + exec_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(run_finished); + BOOST_TEST(exec_finished); +} + +// A ping interval of zero disables timeouts (and doesn't cause trouble) +void test_disabled() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + + // Run a couple of requests to verify that the connection works fine + request req1; + req1.push("PING", "health_check_disabled_1"); + + request req2; + req1.push("PING", "health_check_disabled_2"); + + auto cfg = make_test_config(); + cfg.health_check_interval = 0s; + + bool run_finished = false, exec1_finished = false, exec2_finished = false; + + conn.async_run(cfg, [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + conn.async_exec(req1, ignore, [&](error_code ec, std::size_t) { + exec1_finished = true; + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, [&](error_code ec2, std::size_t) { + exec2_finished = true; + BOOST_TEST_EQ(ec2, error_code()); + conn.cancel(); + }); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(run_finished); + BOOST_TEST(exec1_finished); + BOOST_TEST(exec2_finished); +} + } // namespace int main() { - test_check_health(); + test_reconnection(); + test_error_code(); + test_disabled(); return boost::report_errors(); } \ No newline at end of file diff --git a/test/test_conn_exec_retry.cpp b/test/test_conn_exec_retry.cpp index a01c63dd..e9d941c5 100644 --- a/test/test_conn_exec_retry.cpp +++ b/test/test_conn_exec_retry.cpp @@ -30,7 +30,7 @@ using namespace std::chrono_literals; namespace { -BOOST_AUTO_TEST_CASE(request_retry_false) +BOOST_AUTO_TEST_CASE(request_cancel_if_unresponded_true) { request req0; req0.get_config().cancel_on_connection_lost = true; @@ -105,8 +105,12 @@ BOOST_AUTO_TEST_CASE(request_retry_false) BOOST_TEST(run_finished); } -BOOST_AUTO_TEST_CASE(request_retry_true) +BOOST_AUTO_TEST_CASE(request_cancel_if_unresponded_false) { + // The BLPOP request will block forever, causing the health checker + // to trigger a reconnection. Although req2 has been written, + // it has cancel_if_unresponded=false, so it will be retried + // after reconnection request req0; req0.get_config().cancel_on_connection_lost = true; req0.push("HELLO", 3); @@ -126,23 +130,10 @@ BOOST_AUTO_TEST_CASE(request_retry_true) req3.push("QUIT"); net::io_context ioc; - auto conn = std::make_shared(ioc); - - net::steady_timer st{ioc}; - - bool timer_finished = false, c0_called = false, c1_called = false, c2_called = false, - c3_called = false, run_finished = false; + auto conn = std::make_shared(ioc, logger::level::debug); - st.expires_after(std::chrono::seconds{1}); - st.async_wait([&](error_code ec) { - // Cancels the request before receiving the response. This - // should cause the third request to not complete with error - // since it has cancel_if_unresponded = true and cancellation - // comes after it was written. - timer_finished = true; - BOOST_TEST(ec == error_code()); - conn->cancel(operation::run); - }); + bool c0_called = false, c1_called = false, c2_called = false, c3_called = false, + run_finished = false; auto c3 = [&](error_code ec, std::size_t) { c3_called = true; @@ -172,8 +163,8 @@ BOOST_AUTO_TEST_CASE(request_retry_true) conn->async_exec(req0, ignore, c0); auto cfg = make_test_config(); - cfg.health_check_interval = 5s; - conn->async_run(cfg, {}, [&](error_code ec) { + cfg.health_check_interval = 200ms; + conn->async_run(cfg, [&](error_code ec) { run_finished = true; std::cout << ec.message() << std::endl; BOOST_TEST(ec != error_code()); @@ -181,7 +172,6 @@ BOOST_AUTO_TEST_CASE(request_retry_true) ioc.run_for(test_timeout); - BOOST_TEST(timer_finished); BOOST_TEST(c0_called); BOOST_TEST(c1_called); BOOST_TEST(c2_called); diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 3708cfd2..1049c615 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -210,7 +210,6 @@ BOOST_AUTO_TEST_CASE(test_push_adapter) conn->async_receive([&, conn](error_code ec, std::size_t) { BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled); - conn->cancel(operation::reconnection); push_received = true; }); @@ -220,7 +219,8 @@ BOOST_AUTO_TEST_CASE(test_push_adapter) }); auto cfg = make_test_config(); - conn->async_run(cfg, {}, [&run_finished](error_code ec) { + cfg.reconnect_wait_interval = 0s; + conn->async_run(cfg, [&run_finished](error_code ec) { BOOST_CHECK_EQUAL(ec, redis::error::incompatible_size); run_finished = true; }); diff --git a/test/test_conn_run_cancel.cpp b/test/test_conn_run_cancel.cpp new file mode 100644 index 00000000..769709c4 --- /dev/null +++ b/test/test_conn_run_cancel.cpp @@ -0,0 +1,73 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +#include +#include +#include +#include + +#include "common.hpp" + +#include +#include +#include + +using boost::system::error_code; +namespace net = boost::asio; +using namespace boost::redis; + +namespace { + +// Terminal and partial cancellation work for async_run +void test_per_operation_cancellation(std::string_view name, net::cancellation_type_t cancel_type) +{ + std::cerr << "Running test case: " << name << std::endl; + + // Setup + net::io_context ioc; + connection conn{ioc}; + net::cancellation_signal sig; + + request req; + req.push("PING", "something"); + + bool run_finished = false, exec_finished = false; + + // Run the connection + auto run_cb = [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }; + conn.async_run(make_test_config(), net::bind_cancellation_slot(sig.slot(), run_cb)); + + // Launch a PING + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + exec_finished = true; + BOOST_TEST_EQ(ec, error_code()); + sig.emit(cancel_type); + }); + + ioc.run_for(test_timeout); + + // Check + BOOST_TEST(run_finished); + BOOST_TEST(exec_finished); +} + +} // namespace + +int main() +{ + test_per_operation_cancellation("terminal", net::cancellation_type_t::terminal); + test_per_operation_cancellation("partial", net::cancellation_type_t::partial); + + return boost::report_errors(); +} diff --git a/test/test_conn_tls.cpp b/test/test_conn_tls.cpp index ef49e333..6832a5e6 100644 --- a/test/test_conn_tls.cpp +++ b/test/test_conn_tls.cpp @@ -152,6 +152,8 @@ BOOST_AUTO_TEST_CASE(reconnection) request ping_request; ping_request.push("PING", "some_value"); + ping_request.get_config().cancel_if_unresponded = false; + ping_request.get_config().cancel_on_connection_lost = false; request quit_request; quit_request.push("QUIT"); @@ -173,12 +175,6 @@ BOOST_AUTO_TEST_CASE(reconnection) auto quit_callback = [&](error_code ec, std::size_t) { BOOST_TEST(ec == error_code()); - - // If a request is issued immediately after QUIT, the request sometimes - // fails, probably due to a race condition. This dispatches any pending - // handlers, triggering the reconnection process. - // TODO: this should not be required. - ioc.poll(); conn.async_exec(ping_request, ignore, ping_callback); }; diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 27f4135f..af41a0a9 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -12,7 +12,7 @@ #include #include -#include "common.hpp" +#include namespace net = boost::asio; namespace redis = boost::redis; @@ -25,6 +25,7 @@ using redis::any_adapter; using redis::config; using action = redis::detail::reader_fsm::action; +// Operators namespace boost::redis::detail { extern auto to_string(reader_fsm::action::type t) noexcept -> char const*; @@ -35,6 +36,10 @@ std::ostream& operator<<(std::ostream& os, reader_fsm::action::type t) return os; } +} // namespace boost::redis::detail + +namespace { + // Copy data into the multiplexer with the following steps // // 1. get_read_buffer @@ -48,11 +53,6 @@ void copy_to(multiplexer& mpx, std::string_view data) std::copy(data.cbegin(), data.cend(), buffer.begin()); } -} // namespace boost::redis::detail - -// Operators -namespace { - void test_push() { multiplexer mpx; @@ -64,8 +64,6 @@ void test_push() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -111,8 +109,6 @@ void test_read_needs_more() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // Split the incoming message in three random parts and deliver @@ -156,8 +152,6 @@ void test_read_error() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -166,11 +160,6 @@ void test_read_error() // Deliver the data act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::cancel_run); - BOOST_TEST_EQ(act.ec_, error_code()); - - // Finish - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); } @@ -186,8 +175,6 @@ void test_parse_error() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -196,11 +183,6 @@ void test_parse_error() // Deliver the data act = fsm.resume(payload.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::cancel_run); - BOOST_TEST_EQ(act.ec_, error_code()); - - // Finish - act = fsm.resume(0, {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{redis::error::not_a_number}); } @@ -216,8 +198,6 @@ void test_push_deliver_error() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -231,10 +211,6 @@ void test_push_deliver_error() // Resumes from notifying a push with an error. act = fsm.resume(0, net::error::operation_aborted, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::cancel_run); - - // Finish - act = fsm.resume(0, {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); } @@ -255,32 +231,88 @@ void test_max_read_buffer_size() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // Passes the first part to the fsm. std::string const part1 = ">3\r\n"; copy_to(mpx, part1); act = fsm.resume(part1.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::cancel_run); + BOOST_TEST_EQ(act.type_, action::type::done); + BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size); +} + +// Cancellations +void test_cancel_after_read() +{ + multiplexer mpx; + generic_response resp; + mpx.set_receive_adapter(any_adapter{resp}); + reader_fsm fsm{mpx}; + error_code ec; + action act; + + // Initiate + act = fsm.resume(0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::read_some); + + // Deliver a push, and notify a cancellation. + // This can happen if the cancellation signal arrives before the read handler runs + constexpr std::string_view payload = ">1\r\n+msg1\r\n"; + copy_to(mpx, payload); + act = fsm.resume(payload.size(), ec, cancellation_type_t::terminal); + BOOST_TEST_EQ(act.type_, action::type::done); + BOOST_TEST_EQ(act.push_size_, 0u); + BOOST_TEST_EQ(act.ec_, net::error::operation_aborted); +} + +void test_cancel_after_push_delivery() +{ + multiplexer mpx; + generic_response resp; + mpx.set_receive_adapter(any_adapter{resp}); + reader_fsm fsm{mpx}; + error_code ec; + action act; + + // Initiate + act = fsm.resume(0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::read_some); + + // The fsm is asking for data. + constexpr std::string_view payload = + ">1\r\n+msg1\r\n" + ">1\r\n+msg2 \r\n"; + + copy_to(mpx, payload); + + // Deliver the 1st push + act = fsm.resume(payload.size(), ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); + BOOST_TEST_EQ(act.push_size_, 11u); BOOST_TEST_EQ(act.ec_, error_code()); - act = fsm.resume({}, {}, cancellation_type_t::none); + // We got a cancellation after delivering it. + // This can happen if the cancellation signal arrives before the channel send handler runs + act = fsm.resume(0, ec, cancellation_type_t::terminal); BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size); + BOOST_TEST_EQ(act.push_size_, 0u); + BOOST_TEST_EQ(act.ec_, net::error::operation_aborted); } } // namespace int main() { - test_max_read_buffer_size(); - test_push_deliver_error(); - test_read_needs_more(); test_push(); + test_read_needs_more(); + test_read_error(); test_parse_error(); + test_push_deliver_error(); + test_max_read_buffer_size(); + + test_cancel_after_read(); + test_cancel_after_push_delivery(); return boost::report_errors(); } diff --git a/test/test_unix_sockets.cpp b/test/test_unix_sockets.cpp index abfcb278..75198157 100644 --- a/test/test_unix_sockets.cpp +++ b/test/test_unix_sockets.cpp @@ -81,6 +81,9 @@ void test_reconnection() cfg.reconnect_wait_interval = 10ms; // make the test run faster request ping_request; + ping_request.get_config().cancel_if_not_connected = false; + ping_request.get_config().cancel_if_unresponded = false; + ping_request.get_config().cancel_on_connection_lost = false; ping_request.push("PING", "some_value"); request quit_request; @@ -103,12 +106,6 @@ void test_reconnection() auto quit_callback = [&](error_code ec, std::size_t) { BOOST_TEST(ec == error_code()); - - // If a request is issued immediately after QUIT, the request sometimes - // fails, probably due to a race condition. This dispatches any pending - // handlers, triggering the reconnection process. - // TODO: this should not be required. - ioc.poll(); conn.async_exec(ping_request, ignore, ping_callback); };