From cff2c97afcdbbfda5697695c3a39adbcbed8f53f Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sat, 27 Sep 2025 19:52:00 +0200 Subject: [PATCH 01/30] Initial impl --- include/boost/redis/detail/connect_fsm.hpp | 250 +++++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 include/boost/redis/detail/connect_fsm.hpp diff --git a/include/boost/redis/detail/connect_fsm.hpp b/include/boost/redis/detail/connect_fsm.hpp new file mode 100644 index 00000000..6b6504ad --- /dev/null +++ b/include/boost/redis/detail/connect_fsm.hpp @@ -0,0 +1,250 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_EXEC_FSM_HPP +#define BOOST_REDIS_EXEC_FSM_HPP + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +// Sans-io algorithm for redis_stream::async_connect, as a finite state machine + +namespace boost::redis::detail { + +// TODO: this is duplicated +inline bool is_terminal_cancellation(asio::cancellation_type_t value) +{ + return (value & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none; +} + +// What transport is redis_stream using? +enum class transport_type +{ + tcp, // plaintext TCP + tcp_tls, // TLS over TCP + unix_socket, // UNIX domain sockets +}; + +struct redis_stream_state { + transport_type type{transport_type::tcp}; + bool ssl_stream_used{false}; + const config* cfg{nullptr}; + connection_logger* lgr{nullptr}; +}; + +// What should we do next? +enum class connect_action_type +{ + unix_socket_connect, + tcp_resolve, + tcp_connect, + ssl_stream_reset, + ssl_handshake, + done, +}; + +class connect_action { + connect_action_type type_; + system::error_code ec_; + +public: + connect_action(connect_action_type type) noexcept + : type_{type} + { } + + connect_action(system::error_code ec) noexcept + : type_{connect_action_type::done} + , ec_{ec} + { } + + connect_action_type type() const { return type_; } + system::error_code error() const { return ec_; } +}; + +inline transport_type transport_from_config(const config& cfg) +{ + if (cfg.unix_socket.empty()) { + if (cfg.use_ssl) { + return transport_type::tcp_tls; + } else { + return transport_type::tcp; + } + } else { + BOOST_ASSERT(!cfg.use_ssl); + return transport_type::unix_socket; + } +} + +class connect_fsm { + int resume_point_{0}; + +public: + connect_fsm() = default; + + connect_action resume( + redis_stream_state& st, + system::error_code ec, + const asio::ip::tcp::resolver::results_type& resolver_results, + asio::cancellation_type_t cancel_state) + { + // Translate error codes + if (is_terminal_cancellation(cancel_state)) { + if (ec) { + if (ec == asio::error::operation_aborted) { + ec = error::resolve_timeout; + } + } else { + ec = asio::error::operation_aborted; + } + } + + // Log it + st.lgr->on_resolve(ec, resolver_results); + + // Delegate to the regular resume function + return resume(st, ec, cancel_state); + } + + connect_action resume( + redis_stream_state& st, + system::error_code ec, + const asio::ip::tcp::endpoint& selected_endpoint, + asio::cancellation_type_t cancel_state) + { + // Translate error codes + if (is_terminal_cancellation(cancel_state)) { + if (ec) { + if (ec == asio::error::operation_aborted) { + ec = error::connect_timeout; + } + } else { + ec = asio::error::operation_aborted; + } + } + + // Log it + st.lgr->on_connect(ec, selected_endpoint); + + // Delegate to the regular resume function + return resume(st, ec, cancel_state); + } + + connect_action resume( + redis_stream_state& st, + system::error_code ec, + asio::cancellation_type_t cancel_state) + { + switch (resume_point_) { + BOOST_REDIS_CORO_INITIAL + + // Record the transport that we will be using + st.type = transport_from_config(*st.cfg); + + if (st.type == transport_type::unix_socket) { + // Directly connect to the socket + BOOST_REDIS_YIELD(resume_point_, 1, connect_action_type::unix_socket_connect) + + // Fix error codes. If we were cancelled and the code is operation_aborted, + // it is because per-operation cancellation was activated. If we were not cancelled + // but the operation failed with operation_aborted, it's a timeout. + // Also check for cancellations that didn't cause a failure + if (is_terminal_cancellation(cancel_state)) { + if (ec) { + if (ec == asio::error::operation_aborted) { + ec = error::connect_timeout; + } + } else { + ec = asio::error::operation_aborted; + } + } + + // Log it + st.lgr->on_connect(ec, st.cfg->unix_socket); + + // If this failed, we can't continue + if (ec) { + return ec; + } + + // Done + return system::error_code(); + } else { + // ssl::stream doesn't support being re-used. If we're to use + // TLS and the stream has been used, re-create it. + // Must be done before anything else is done on the stream + if (st.cfg->use_ssl && st.ssl_stream_used) { + BOOST_REDIS_YIELD(resume_point_, 2, connect_action_type::ssl_stream_reset) + } + + // Resolve names. The continuation needs access to the returned + // endpoints, and is a specialized resume() that will call this function + BOOST_REDIS_YIELD(resume_point_, 3, connect_action_type::tcp_resolve) + + // If this failed, we can't continue (error code translation already performed here) + if (ec) { + return ec; + } + + // Now connect to the endpoints returned by the resolver. + // This has a specialized resume(), too + BOOST_REDIS_YIELD(resume_point_, 4, connect_action_type::tcp_connect) + + // If this failed, we can't continue (error code translation already performed here) + if (ec) { + return ec; + } + + if (st.cfg->use_ssl) { + // Mark the SSL stream as used + st.ssl_stream_used = true; + + // Perform the TLS handshake + BOOST_REDIS_YIELD(resume_point_, 5, connect_action_type::ssl_handshake) + + // Translate error codes + if (is_terminal_cancellation(cancel_state)) { + if (ec) { + if (ec == asio::error::operation_aborted) { + ec = error::ssl_handshake_timeout; + } + } else { + ec = asio::error::operation_aborted; + } + } + + // Log it + st.lgr->on_ssl_handshake(ec); + + // If this failed, we can't continue + if (ec) { + return ec; + } + } + + // Done + return system::error_code(); + } + } + + BOOST_ASSERT(false); + return system::error_code(); + } + +}; // namespace boost::redis::detail + +} // namespace boost::redis::detail + +#endif From ee255e707c8c60fb17d0a7fbc73920a9d1830fb2 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sat, 27 Sep 2025 20:29:16 +0200 Subject: [PATCH 02/30] Bridge with I/O --- include/boost/redis/connection.hpp | 2 +- include/boost/redis/detail/connect_fsm.hpp | 39 ++-- include/boost/redis/detail/redis_stream.hpp | 194 ++++++++------------ 3 files changed, 95 insertions(+), 140 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index b283af27..cf436fcc 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -424,7 +424,7 @@ class run_op { for (;;) { // Try to connect BOOST_ASIO_CORO_YIELD - conn_->stream_.async_connect(&conn_->cfg_, &conn_->logger_, std::move(self)); + conn_->stream_.async_connect(conn_->cfg_, conn_->logger_, std::move(self)); // If we were successful, run all the connection tasks if (!ec) { diff --git a/include/boost/redis/detail/connect_fsm.hpp b/include/boost/redis/detail/connect_fsm.hpp index 6b6504ad..b7ea7d9c 100644 --- a/include/boost/redis/detail/connect_fsm.hpp +++ b/include/boost/redis/detail/connect_fsm.hpp @@ -6,8 +6,8 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#ifndef BOOST_REDIS_EXEC_FSM_HPP -#define BOOST_REDIS_EXEC_FSM_HPP +#ifndef BOOST_REDIS_CONNECT_FSM_HPP +#define BOOST_REDIS_CONNECT_FSM_HPP #include #include @@ -41,8 +41,6 @@ enum class transport_type struct redis_stream_state { transport_type type{transport_type::tcp}; bool ssl_stream_used{false}; - const config* cfg{nullptr}; - connection_logger* lgr{nullptr}; }; // What should we do next? @@ -90,14 +88,21 @@ inline transport_type transport_from_config(const config& cfg) class connect_fsm { int resume_point_{0}; + const config* cfg_{nullptr}; + connection_logger* lgr_{nullptr}; public: - connect_fsm() = default; + connect_fsm(const config& cfg, connection_logger& lgr) noexcept + : cfg_(&cfg) + , lgr_(&lgr) + { } + + const config& get_config() const { return *cfg_; } connect_action resume( - redis_stream_state& st, system::error_code ec, const asio::ip::tcp::resolver::results_type& resolver_results, + redis_stream_state& st, asio::cancellation_type_t cancel_state) { // Translate error codes @@ -112,16 +117,16 @@ class connect_fsm { } // Log it - st.lgr->on_resolve(ec, resolver_results); + lgr_->on_resolve(ec, resolver_results); // Delegate to the regular resume function - return resume(st, ec, cancel_state); + return resume(ec, st, cancel_state); } connect_action resume( - redis_stream_state& st, system::error_code ec, const asio::ip::tcp::endpoint& selected_endpoint, + redis_stream_state& st, asio::cancellation_type_t cancel_state) { // Translate error codes @@ -136,22 +141,22 @@ class connect_fsm { } // Log it - st.lgr->on_connect(ec, selected_endpoint); + lgr_->on_connect(ec, selected_endpoint); // Delegate to the regular resume function - return resume(st, ec, cancel_state); + return resume(ec, st, cancel_state); } connect_action resume( - redis_stream_state& st, system::error_code ec, + redis_stream_state& st, asio::cancellation_type_t cancel_state) { switch (resume_point_) { BOOST_REDIS_CORO_INITIAL // Record the transport that we will be using - st.type = transport_from_config(*st.cfg); + st.type = transport_from_config(*cfg_); if (st.type == transport_type::unix_socket) { // Directly connect to the socket @@ -172,7 +177,7 @@ class connect_fsm { } // Log it - st.lgr->on_connect(ec, st.cfg->unix_socket); + lgr_->on_connect(ec, cfg_->unix_socket); // If this failed, we can't continue if (ec) { @@ -185,7 +190,7 @@ class connect_fsm { // ssl::stream doesn't support being re-used. If we're to use // TLS and the stream has been used, re-create it. // Must be done before anything else is done on the stream - if (st.cfg->use_ssl && st.ssl_stream_used) { + if (cfg_->use_ssl && st.ssl_stream_used) { BOOST_REDIS_YIELD(resume_point_, 2, connect_action_type::ssl_stream_reset) } @@ -207,7 +212,7 @@ class connect_fsm { return ec; } - if (st.cfg->use_ssl) { + if (cfg_->use_ssl) { // Mark the SSL stream as used st.ssl_stream_used = true; @@ -226,7 +231,7 @@ class connect_fsm { } // Log it - st.lgr->on_ssl_handshake(ec); + lgr_->on_ssl_handshake(ec); // If this failed, we can't continue if (ec) { diff --git a/include/boost/redis/detail/redis_stream.hpp b/include/boost/redis/detail/redis_stream.hpp index e8590c70..a2171ef7 100644 --- a/include/boost/redis/detail/redis_stream.hpp +++ b/include/boost/redis/detail/redis_stream.hpp @@ -8,6 +8,7 @@ #define BOOST_REDIS_REDIS_STREAM_HPP #include +#include #include #include @@ -31,14 +32,6 @@ namespace boost { namespace redis { namespace detail { -// What transport is redis_stream using? -enum class transport_type -{ - tcp, // plaintext TCP - tcp_tls, // TLS over TCP - unix_socket, // UNIX domain sockets -}; - template class redis_stream { asio::ssl::context ssl_ctx_; @@ -48,135 +41,92 @@ class redis_stream { asio::basic_stream_socket unix_socket_; #endif typename asio::steady_timer::template rebind_executor::other timer_; - - transport_type transport_{transport_type::tcp}; - bool ssl_stream_used_{false}; + redis_stream_state st_; void reset_stream() { stream_ = {resolv_.get_executor(), ssl_ctx_}; } - static transport_type transport_from_config(const config& cfg) - { - if (cfg.unix_socket.empty()) { - if (cfg.use_ssl) { - return transport_type::tcp_tls; - } else { - return transport_type::tcp; - } - } else { - BOOST_ASSERT(!cfg.use_ssl); - return transport_type::unix_socket; - } - } - struct connect_op { redis_stream& obj; - const config* cfg; - connection_logger* lgr; - asio::coroutine coro{}; - - // This overload will be used for connects. We only need the endpoint - // for logging, so log it and call the coroutine - template - void operator()( - Self& self, - system::error_code ec, - const asio::ip::tcp::endpoint& selected_endpoint) - { - lgr->on_connect(ec, selected_endpoint); - (*this)(self, ec); - } + connect_fsm fsm_; template - void operator()( - Self& self, - system::error_code ec = {}, - asio::ip::tcp::resolver::results_type resolver_results = {}) + void execute_action(Self& self, connect_action act) { - BOOST_ASIO_CORO_REENTER(coro) - { - // Record the transport that we will be using - obj.transport_ = transport_from_config(*cfg); + const auto& cfg = fsm_.get_config(); - if (obj.transport_ == transport_type::unix_socket) { + switch (act.type()) { + case connect_action_type::unix_socket_connect: #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS - // Directly connect to the socket - BOOST_ASIO_CORO_YIELD obj.unix_socket_.async_connect( - cfg->unix_socket, - asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self))); - - // Log it - lgr->on_connect(ec, cfg->unix_socket); - - // If this failed, we can't continue - if (ec) { - self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec); - return; - } + cfg.unix_socket, + asio::cancel_after(obj.timer_, cfg.connect_timeout, std::move(self))); #else BOOST_ASSERT(false); #endif - } else { - // ssl::stream doesn't support being re-used. If we're to use - // TLS and the stream has been used, re-create it. - // Must be done before anything else is done on the stream - if (cfg->use_ssl && obj.ssl_stream_used_) - obj.reset_stream(); + return; - BOOST_ASIO_CORO_YIELD + case connect_action_type::tcp_resolve: obj.resolv_.async_resolve( - cfg->addr.host, - cfg->addr.port, - asio::cancel_after(obj.timer_, cfg->resolve_timeout, std::move(self))); - - // Log it - lgr->on_resolve(ec, resolver_results); - - // If this failed, we can't continue - if (ec) { - self.complete(ec == asio::error::operation_aborted ? error::resolve_timeout : ec); - return; - } - - // Connect to the address that the resolver provided us - BOOST_ASIO_CORO_YIELD - asio::async_connect( - obj.stream_.next_layer(), - std::move(resolver_results), - asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self))); - - // Note: logging is performed in the specialized operator() function. - // If this failed, we can't continue - if (ec) { - self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec); - return; - } - - if (cfg->use_ssl) { - // Mark the SSL stream as used - obj.ssl_stream_used_ = true; - - // If we were configured to use TLS, perform the handshake - BOOST_ASIO_CORO_YIELD - obj.stream_.async_handshake( - asio::ssl::stream_base::client, - asio::cancel_after(obj.timer_, cfg->ssl_handshake_timeout, std::move(self))); - - lgr->on_ssl_handshake(ec); + cfg.addr.host, + cfg.addr.port, + asio::cancel_after(obj.timer_, cfg.resolve_timeout, std::move(self))); + return; + case connect_action_type::ssl_stream_reset: + obj.reset_stream(); + // this action does not require yielding. Execute the next action immediately + (*this)(self); + return; + case connect_action_type::ssl_handshake: + obj.stream_.async_handshake( + asio::ssl::stream_base::client, + asio::cancel_after(obj.timer_, cfg.ssl_handshake_timeout, std::move(self))); + return; + case connect_action_type::done: self.complete(act.error()); break; + // Connect should use the specialized handler, where resolver results are available + case connect_action_type::tcp_connect: + default: BOOST_ASSERT(false); + } + } - // If this failed, we can't continue - if (ec) { - self.complete( - ec == asio::error::operation_aborted ? error::ssl_handshake_timeout : ec); - return; - } - } - } + // This overload will be used for connects + template + void operator()( + Self& self, + system::error_code ec, + const asio::ip::tcp::endpoint& selected_endpoint) + { + auto act = fsm_.resume( + ec, + selected_endpoint, + obj.st_, + self.get_cancellation_state().cancelled()); + execute_action(self, act); + } - // Done - self.complete(system::error_code()); + // This overload will be used for resolves + template + void operator()( + Self& self, + system::error_code ec, + asio::ip::tcp::resolver::results_type endpoints) + { + auto act = fsm_.resume(ec, endpoints, obj.st_, self.get_cancellation_state().cancelled()); + if (act.type() == connect_action_type::tcp_connect) { + asio::async_connect( + obj.stream_.next_layer(), + std::move(endpoints), + asio::cancel_after(obj.timer_, fsm_.get_config().connect_timeout, std::move(self))); + } else { + execute_action(self, act); } } + + template + void operator()(Self& self, system::error_code ec = {}) + { + auto act = fsm_.resume(ec, obj.st_, self.get_cancellation_state().cancelled()); + execute_action(self, act); + } }; public: @@ -199,7 +149,7 @@ class redis_stream { bool is_open() const { #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS - if (transport_ == transport_type::unix_socket) + if (st_.type == transport_type::unix_socket) return unix_socket_.is_open(); #endif return stream_.next_layer().is_open(); @@ -209,10 +159,10 @@ class redis_stream { // I/O template - auto async_connect(const config* cfg, connection_logger* l, CompletionToken&& token) + auto async_connect(const config& cfg, connection_logger& l, CompletionToken&& token) { return asio::async_compose( - connect_op{*this, cfg, l}, + connect_op{*this, connect_fsm(cfg, l)}, token); } @@ -220,7 +170,7 @@ class redis_stream { template void async_write_some(const ConstBufferSequence& buffers, CompletionToken&& token) { - switch (transport_) { + switch (st_.type) { case transport_type::tcp: { stream_.next_layer().async_write_some(buffers, std::forward(token)); @@ -245,7 +195,7 @@ class redis_stream { template void async_read_some(const MutableBufferSequence& buffers, CompletionToken&& token) { - switch (transport_) { + switch (st_.type) { case transport_type::tcp: { return stream_.next_layer().async_read_some( From c794327a3110ff52650a3df8da3db8debf41fdaf Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 20:05:58 +0200 Subject: [PATCH 03/30] Initial testing setup --- test/CMakeLists.txt | 1 + test/Jamfile | 1 + test/test_connect_fsm.cpp | 133 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+) create mode 100644 test/test_connect_fsm.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e5fdc1e4..dbb35710 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -41,6 +41,7 @@ make_test(test_exec_fsm) make_test(test_log_to_file) make_test(test_conn_logging) make_test(test_reader_fsm) +make_test(test_connect_fsm) make_test(test_setup_request_utils) make_test(test_multiplexer) diff --git a/test/Jamfile b/test/Jamfile index 20be13b7..60b4bb95 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -57,6 +57,7 @@ local tests = test_log_to_file test_conn_logging test_reader_fsm + test_connect_fsm test_setup_request_utils test_multiplexer ; diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp new file mode 100644 index 00000000..e24ba603 --- /dev/null +++ b/test/test_connect_fsm.cpp @@ -0,0 +1,133 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include + +#include + +#include "boost/asio/ip/tcp.hpp" +#include "boost/redis/config.hpp" +#include "boost/redis/detail/connection_logger.hpp" +#include "boost/redis/logger.hpp" +#include "boost/system/detail/error_code.hpp" + +#include +#include +#include +#include +#include + +using namespace boost::redis; +namespace asio = boost::asio; +using detail::connect_fsm; +using detail::connect_action_type; +using detail::connect_action; +using detail::connection_logger; +using detail::redis_stream_state; +using detail::transport_type; +using asio::ip::tcp; +using boost::system::error_code; +using boost::asio::cancellation_type_t; + +// Operators +static const char* to_string(connect_action_type type) +{ + switch (type) { + case connect_action_type::unix_socket_connect: + return "connect_action_type::unix_socket_connect"; + case connect_action_type::tcp_resolve: return "connect_action_type::tcp_resolve"; + case connect_action_type::tcp_connect: return "connect_action_type::tcp_connect"; + case connect_action_type::ssl_stream_reset: return "connect_action_type::ssl_stream_reset"; + case connect_action_type::ssl_handshake: return "connect_action_type::ssl_handshake"; + case connect_action_type::done: return "connect_action_type::done"; + default: return ""; + } +} + +static const char* to_string(transport_type type) +{ + switch (type) { + case transport_type::tcp: return "transport_type::tcp"; + case transport_type::tcp_tls: return "transport_type::tcp_tls"; + case transport_type::unix_socket: return "transport_type::unix_socket"; + default: return ""; + } +} + +namespace boost::redis::detail { + +std::ostream& operator<<(std::ostream& os, connect_action_type type) +{ + return os << to_string(type); +} + +std::ostream& operator<<(std::ostream& os, transport_type type) { return os << to_string(type); } + +bool operator==(const connect_action& lhs, const connect_action& rhs) noexcept +{ + return lhs.type() == rhs.type() && lhs.error() == rhs.error(); +} + +std::ostream& operator<<(std::ostream& os, const connect_action& act) +{ + os << "connect_action{ .type=" << act.type(); + if (act.type() == connect_action_type::done) + os << ", .error=" << act.error(); + return os << " }"; +} + +} // namespace boost::redis::detail + +namespace { + +const tcp::endpoint endpoint(asio::ip::make_address("192.168.10.1"), 1234); + +auto resolver_results = [] { + const tcp::endpoint data[] = {endpoint}; + return asio::ip::tcp::resolver::results_type::create( + std::begin(data), + std::end(data), + "my_host", + "1234"); +}(); + +void test_success_tcp() +{ + // Setup + config cfg; + std::ostringstream oss; + std::vector> msgs; + detail::connection_logger lgr( + logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { + msgs.emplace_back(lvl, msg); + })); + connect_fsm fsm(cfg, lgr); + redis_stream_state st{}; + + // Initiate + auto act = fsm.resume(error_code(), st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fsm.resume(error_code(), resolver_results, st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fsm.resume(error_code(), endpoint, st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::done); + + // The transport type was appropriately set + BOOST_TEST_EQ(st.type, transport_type::tcp); + + // TODO: check logging +} + +} // namespace + +int main() +{ + test_success_tcp(); + + return boost::report_errors(); +} From 259a7934611ac6a8f1d57e4c744739e34405c49f Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 20:18:42 +0200 Subject: [PATCH 04/30] Check logging --- test/test_connect_fsm.cpp | 54 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 4 deletions(-) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index e24ba603..860a23e5 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -17,6 +17,7 @@ #include "boost/system/detail/error_code.hpp" #include +#include #include #include #include @@ -59,6 +60,28 @@ static const char* to_string(transport_type type) } } +static const char* to_string(logger::level lvl) +{ + switch (lvl) { + case logger::level::disabled: return "logger::level::disabled"; + case logger::level::emerg: return "logger::level::emerg"; + case logger::level::alert: return "logger::level::alert"; + case logger::level::crit: return "logger::level::crit"; + case logger::level::err: return "logger::level::err"; + case logger::level::warning: return "logger::level::warning"; + case logger::level::notice: return "logger::level::notice"; + case logger::level::info: return "logger::level::info"; + case logger::level::debug: return "logger::level::debug"; + default: return ""; + } +} + +namespace boost::redis { + +std::ostream& operator<<(std::ostream& os, logger::level lvl) { return os << to_string(lvl); } + +} // namespace boost::redis + namespace boost::redis::detail { std::ostream& operator<<(std::ostream& os, connect_action_type type) @@ -85,10 +108,12 @@ std::ostream& operator<<(std::ostream& os, const connect_action& act) namespace { +// TCP endpoints const tcp::endpoint endpoint(asio::ip::make_address("192.168.10.1"), 1234); +const tcp::endpoint endpoint2(asio::ip::make_address("192.168.10.2"), 1235); auto resolver_results = [] { - const tcp::endpoint data[] = {endpoint}; + const tcp::endpoint data[] = {endpoint, endpoint2}; return asio::ip::tcp::resolver::results_type::create( std::begin(data), std::end(data), @@ -96,15 +121,31 @@ auto resolver_results = [] { "1234"); }(); +// For checking logs +struct log_message { + logger::level lvl; + std::string msg; + + friend bool operator==(const log_message& lhs, const log_message& rhs) noexcept + { + return lhs.lvl == rhs.lvl && lhs.msg == rhs.msg; + } + + friend std::ostream& operator<<(std::ostream& os, const log_message& v) + { + return os << "log_message { .lvl=" << v.lvl << ", .msg=" << v.msg << " }"; + } +}; + void test_success_tcp() { // Setup config cfg; std::ostringstream oss; - std::vector> msgs; + std::vector msgs; detail::connection_logger lgr( logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { - msgs.emplace_back(lvl, msg); + msgs.push_back({lvl, std::string(msg)}); })); connect_fsm fsm(cfg, lgr); redis_stream_state st{}; @@ -120,7 +161,12 @@ void test_success_tcp() // The transport type was appropriately set BOOST_TEST_EQ(st.type, transport_type::tcp); - // TODO: check logging + // Check logging + const log_message expected[] = { + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Connected to 192.168.10.1:1234" }, + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), msgs.begin(), msgs.end()); } } // namespace From 2a779d29ae7c6a0794dba3afc5641b579c0a5126 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 20:25:40 +0200 Subject: [PATCH 05/30] Move to ipp --- include/boost/redis/detail/connect_fsm.hpp | 171 ++----------------- include/boost/redis/impl/connect_fsm.ipp | 190 +++++++++++++++++++++ include/boost/redis/src.hpp | 1 + 3 files changed, 201 insertions(+), 161 deletions(-) create mode 100644 include/boost/redis/impl/connect_fsm.ipp diff --git a/include/boost/redis/detail/connect_fsm.hpp b/include/boost/redis/detail/connect_fsm.hpp index b7ea7d9c..d1d8a556 100644 --- a/include/boost/redis/detail/connect_fsm.hpp +++ b/include/boost/redis/detail/connect_fsm.hpp @@ -9,27 +9,22 @@ #ifndef BOOST_REDIS_CONNECT_FSM_HPP #define BOOST_REDIS_CONNECT_FSM_HPP -#include -#include -#include -#include - #include -#include #include -#include #include // Sans-io algorithm for redis_stream::async_connect, as a finite state machine -namespace boost::redis::detail { +namespace boost::redis { + +struct config; -// TODO: this is duplicated -inline bool is_terminal_cancellation(asio::cancellation_type_t value) -{ - return (value & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none; } +namespace boost::redis::detail { + +class connection_logger; + // What transport is redis_stream using? enum class transport_type { @@ -72,20 +67,6 @@ class connect_action { system::error_code error() const { return ec_; } }; -inline transport_type transport_from_config(const config& cfg) -{ - if (cfg.unix_socket.empty()) { - if (cfg.use_ssl) { - return transport_type::tcp_tls; - } else { - return transport_type::tcp; - } - } else { - BOOST_ASSERT(!cfg.use_ssl); - return transport_type::unix_socket; - } -} - class connect_fsm { int resume_point_{0}; const config* cfg_{nullptr}; @@ -103,150 +84,18 @@ class connect_fsm { system::error_code ec, const asio::ip::tcp::resolver::results_type& resolver_results, redis_stream_state& st, - asio::cancellation_type_t cancel_state) - { - // Translate error codes - if (is_terminal_cancellation(cancel_state)) { - if (ec) { - if (ec == asio::error::operation_aborted) { - ec = error::resolve_timeout; - } - } else { - ec = asio::error::operation_aborted; - } - } - - // Log it - lgr_->on_resolve(ec, resolver_results); - - // Delegate to the regular resume function - return resume(ec, st, cancel_state); - } + asio::cancellation_type_t cancel_state); connect_action resume( system::error_code ec, const asio::ip::tcp::endpoint& selected_endpoint, redis_stream_state& st, - asio::cancellation_type_t cancel_state) - { - // Translate error codes - if (is_terminal_cancellation(cancel_state)) { - if (ec) { - if (ec == asio::error::operation_aborted) { - ec = error::connect_timeout; - } - } else { - ec = asio::error::operation_aborted; - } - } - - // Log it - lgr_->on_connect(ec, selected_endpoint); - - // Delegate to the regular resume function - return resume(ec, st, cancel_state); - } + asio::cancellation_type_t cancel_state); connect_action resume( system::error_code ec, redis_stream_state& st, - asio::cancellation_type_t cancel_state) - { - switch (resume_point_) { - BOOST_REDIS_CORO_INITIAL - - // Record the transport that we will be using - st.type = transport_from_config(*cfg_); - - if (st.type == transport_type::unix_socket) { - // Directly connect to the socket - BOOST_REDIS_YIELD(resume_point_, 1, connect_action_type::unix_socket_connect) - - // Fix error codes. If we were cancelled and the code is operation_aborted, - // it is because per-operation cancellation was activated. If we were not cancelled - // but the operation failed with operation_aborted, it's a timeout. - // Also check for cancellations that didn't cause a failure - if (is_terminal_cancellation(cancel_state)) { - if (ec) { - if (ec == asio::error::operation_aborted) { - ec = error::connect_timeout; - } - } else { - ec = asio::error::operation_aborted; - } - } - - // Log it - lgr_->on_connect(ec, cfg_->unix_socket); - - // If this failed, we can't continue - if (ec) { - return ec; - } - - // Done - return system::error_code(); - } else { - // ssl::stream doesn't support being re-used. If we're to use - // TLS and the stream has been used, re-create it. - // Must be done before anything else is done on the stream - if (cfg_->use_ssl && st.ssl_stream_used) { - BOOST_REDIS_YIELD(resume_point_, 2, connect_action_type::ssl_stream_reset) - } - - // Resolve names. The continuation needs access to the returned - // endpoints, and is a specialized resume() that will call this function - BOOST_REDIS_YIELD(resume_point_, 3, connect_action_type::tcp_resolve) - - // If this failed, we can't continue (error code translation already performed here) - if (ec) { - return ec; - } - - // Now connect to the endpoints returned by the resolver. - // This has a specialized resume(), too - BOOST_REDIS_YIELD(resume_point_, 4, connect_action_type::tcp_connect) - - // If this failed, we can't continue (error code translation already performed here) - if (ec) { - return ec; - } - - if (cfg_->use_ssl) { - // Mark the SSL stream as used - st.ssl_stream_used = true; - - // Perform the TLS handshake - BOOST_REDIS_YIELD(resume_point_, 5, connect_action_type::ssl_handshake) - - // Translate error codes - if (is_terminal_cancellation(cancel_state)) { - if (ec) { - if (ec == asio::error::operation_aborted) { - ec = error::ssl_handshake_timeout; - } - } else { - ec = asio::error::operation_aborted; - } - } - - // Log it - lgr_->on_ssl_handshake(ec); - - // If this failed, we can't continue - if (ec) { - return ec; - } - } - - // Done - return system::error_code(); - } - } - - BOOST_ASSERT(false); - return system::error_code(); - } + asio::cancellation_type_t cancel_state); }; // namespace boost::redis::detail diff --git a/include/boost/redis/impl/connect_fsm.ipp b/include/boost/redis/impl/connect_fsm.ipp new file mode 100644 index 00000000..558da09b --- /dev/null +++ b/include/boost/redis/impl/connect_fsm.ipp @@ -0,0 +1,190 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace boost::redis::detail { + +inline transport_type transport_from_config(const config& cfg) +{ + if (cfg.unix_socket.empty()) { + if (cfg.use_ssl) { + return transport_type::tcp_tls; + } else { + return transport_type::tcp; + } + } else { + BOOST_ASSERT(!cfg.use_ssl); + return transport_type::unix_socket; + } +} + +// TODO: this is duplicated +inline bool is_terminal_cancellation(asio::cancellation_type_t value) +{ + return (value & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none; +} + +connect_action connect_fsm::resume( + system::error_code ec, + const asio::ip::tcp::resolver::results_type& resolver_results, + redis_stream_state& st, + asio::cancellation_type_t cancel_state) +{ + // Translate error codes + if (is_terminal_cancellation(cancel_state)) { + if (ec) { + if (ec == asio::error::operation_aborted) { + ec = error::resolve_timeout; + } + } else { + ec = asio::error::operation_aborted; + } + } + + // Log it + lgr_->on_resolve(ec, resolver_results); + + // Delegate to the regular resume function + return resume(ec, st, cancel_state); +} + +connect_action connect_fsm::resume( + system::error_code ec, + const asio::ip::tcp::endpoint& selected_endpoint, + redis_stream_state& st, + asio::cancellation_type_t cancel_state) +{ + // Translate error codes + if (is_terminal_cancellation(cancel_state)) { + if (ec) { + if (ec == asio::error::operation_aborted) { + ec = error::connect_timeout; + } + } else { + ec = asio::error::operation_aborted; + } + } + + // Log it + lgr_->on_connect(ec, selected_endpoint); + + // Delegate to the regular resume function + return resume(ec, st, cancel_state); +} + +connect_action connect_fsm::resume( + system::error_code ec, + redis_stream_state& st, + asio::cancellation_type_t cancel_state) +{ + switch (resume_point_) { + BOOST_REDIS_CORO_INITIAL + + // Record the transport that we will be using + st.type = transport_from_config(*cfg_); + + if (st.type == transport_type::unix_socket) { + // Directly connect to the socket + BOOST_REDIS_YIELD(resume_point_, 1, connect_action_type::unix_socket_connect) + + // Fix error codes. If we were cancelled and the code is operation_aborted, + // it is because per-operation cancellation was activated. If we were not cancelled + // but the operation failed with operation_aborted, it's a timeout. + // Also check for cancellations that didn't cause a failure + if (is_terminal_cancellation(cancel_state)) { + if (ec) { + if (ec == asio::error::operation_aborted) { + ec = error::connect_timeout; + } + } else { + ec = asio::error::operation_aborted; + } + } + + // Log it + lgr_->on_connect(ec, cfg_->unix_socket); + + // If this failed, we can't continue + if (ec) { + return ec; + } + + // Done + return system::error_code(); + } else { + // ssl::stream doesn't support being re-used. If we're to use + // TLS and the stream has been used, re-create it. + // Must be done before anything else is done on the stream + if (cfg_->use_ssl && st.ssl_stream_used) { + BOOST_REDIS_YIELD(resume_point_, 2, connect_action_type::ssl_stream_reset) + } + + // Resolve names. The continuation needs access to the returned + // endpoints, and is a specialized resume() that will call this function + BOOST_REDIS_YIELD(resume_point_, 3, connect_action_type::tcp_resolve) + + // If this failed, we can't continue (error code translation already performed here) + if (ec) { + return ec; + } + + // Now connect to the endpoints returned by the resolver. + // This has a specialized resume(), too + BOOST_REDIS_YIELD(resume_point_, 4, connect_action_type::tcp_connect) + + // If this failed, we can't continue (error code translation already performed here) + if (ec) { + return ec; + } + + if (cfg_->use_ssl) { + // Mark the SSL stream as used + st.ssl_stream_used = true; + + // Perform the TLS handshake + BOOST_REDIS_YIELD(resume_point_, 5, connect_action_type::ssl_handshake) + + // Translate error codes + if (is_terminal_cancellation(cancel_state)) { + if (ec) { + if (ec == asio::error::operation_aborted) { + ec = error::ssl_handshake_timeout; + } + } else { + ec = asio::error::operation_aborted; + } + } + + // Log it + lgr_->on_ssl_handshake(ec); + + // If this failed, we can't continue + if (ec) { + return ec; + } + } + + // Done + return system::error_code(); + } + } + + BOOST_ASSERT(false); + return system::error_code(); +} + +} // namespace boost::redis::detail diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 3e57518e..b47fb22a 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include From b7aed6878d8b527b16915930f00f1c5ff17ee4b0 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 20:36:28 +0200 Subject: [PATCH 06/30] Resolve error --- test/test_connect_fsm.cpp | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 860a23e5..a27b6b90 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include using namespace boost::redis; @@ -34,6 +33,7 @@ using detail::transport_type; using asio::ip::tcp; using boost::system::error_code; using boost::asio::cancellation_type_t; +using resolver_results = tcp::resolver::results_type; // Operators static const char* to_string(connect_action_type type) @@ -112,7 +112,7 @@ namespace { const tcp::endpoint endpoint(asio::ip::make_address("192.168.10.1"), 1234); const tcp::endpoint endpoint2(asio::ip::make_address("192.168.10.2"), 1235); -auto resolver_results = [] { +auto resolver_data = [] { const tcp::endpoint data[] = {endpoint, endpoint2}; return asio::ip::tcp::resolver::results_type::create( std::begin(data), @@ -137,7 +137,7 @@ struct log_message { } }; -void test_success_tcp() +void test_tcp_success() { // Setup config cfg; @@ -153,7 +153,7 @@ void test_success_tcp() // Initiate auto act = fsm.resume(error_code(), st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); - act = fsm.resume(error_code(), resolver_results, st, cancellation_type_t::none); + act = fsm.resume(error_code(), resolver_data, st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fsm.resume(error_code(), endpoint, st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::done); @@ -169,11 +169,40 @@ void test_success_tcp() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), msgs.begin(), msgs.end()); } +void test_tcp_resolve_error() +{ + // Setup + config cfg; + std::ostringstream oss; + std::vector msgs; + detail::connection_logger lgr( + logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { + msgs.push_back({lvl, std::string(msg)}); + })); + connect_fsm fsm(cfg, lgr); + redis_stream_state st{}; + + // Initiate + auto act = fsm.resume(error_code(), st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fsm.resume(error::empty_field, resolver_results{}, st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::empty_field)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Error resolving the server hostname: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), msgs.begin(), msgs.end()); +} + } // namespace int main() { - test_success_tcp(); + test_tcp_success(); + test_tcp_resolve_error(); return boost::report_errors(); } From 3e2970f225db320383c1cf9cc4b0b343617d6e21 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 20:38:36 +0200 Subject: [PATCH 07/30] Fixture --- test/test_connect_fsm.cpp | 47 ++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index a27b6b90..bd564ca2 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -137,55 +137,52 @@ struct log_message { } }; -void test_tcp_success() -{ - // Setup +// Reduce duplication +struct fixture { config cfg; std::ostringstream oss; std::vector msgs; - detail::connection_logger lgr( + detail::connection_logger lgr{ logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { msgs.push_back({lvl, std::string(msg)}); - })); - connect_fsm fsm(cfg, lgr); + })}; + connect_fsm fsm{cfg, lgr}; redis_stream_state st{}; +}; + +void test_tcp_success() +{ + // Setup + fixture fix; - // Initiate - auto act = fsm.resume(error_code(), st, cancellation_type_t::none); + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); - act = fsm.resume(error_code(), resolver_data, st, cancellation_type_t::none); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); - act = fsm.resume(error_code(), endpoint, st, cancellation_type_t::none); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::done); // The transport type was appropriately set - BOOST_TEST_EQ(st.type, transport_type::tcp); + BOOST_TEST_EQ(fix.st.type, transport_type::tcp); // Check logging const log_message expected[] = { {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::info, "Connected to 192.168.10.1:1234" }, }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), msgs.begin(), msgs.end()); + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } void test_tcp_resolve_error() { // Setup - config cfg; - std::ostringstream oss; - std::vector msgs; - detail::connection_logger lgr( - logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { - msgs.push_back({lvl, std::string(msg)}); - })); - connect_fsm fsm(cfg, lgr); - redis_stream_state st{}; + fixture fix; - // Initiate - auto act = fsm.resume(error_code(), st, cancellation_type_t::none); + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); - act = fsm.resume(error::empty_field, resolver_results{}, st, cancellation_type_t::none); + act = fix.fsm.resume(error::empty_field, resolver_results{}, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::empty_field)); // Check logging @@ -194,7 +191,7 @@ void test_tcp_resolve_error() {logger::level::info, "Error resolving the server hostname: Expected field value is empty. [boost.redis:5]"}, // clang-format on }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), msgs.begin(), msgs.end()); + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } } // namespace From 2fad9efd8589d7182c57a3768dae0055f4b2670a Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 20:51:15 +0200 Subject: [PATCH 08/30] Timeout test and fix bug --- include/boost/redis/impl/connect_fsm.ipp | 59 ++++++++---------------- test/test_connect_fsm.cpp | 26 +++++++++++ 2 files changed, 46 insertions(+), 39 deletions(-) diff --git a/include/boost/redis/impl/connect_fsm.ipp b/include/boost/redis/impl/connect_fsm.ipp index 558da09b..e728d11e 100644 --- a/include/boost/redis/impl/connect_fsm.ipp +++ b/include/boost/redis/impl/connect_fsm.ipp @@ -32,10 +32,23 @@ inline transport_type transport_from_config(const config& cfg) } } -// TODO: this is duplicated -inline bool is_terminal_cancellation(asio::cancellation_type_t value) +inline system::error_code translate_timeout_error( + system::error_code io_ec, + asio::cancellation_type_t cancel_state, + error code_if_cancelled) { - return (value & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none; + // Translates cancellations and timeout errors into a single error_code. + // - Cancellation state set, and an I/O error: the entire operation was cancelled. + // The I/O code (probably operation_aborted) is appropriate. + // - Cancellation state set, and no I/O error: same as above, but the cancellation + // arrived after the operation completed and before the handler was called. Set the code here. + // - No cancellation state set, I/O error set to operation_aborted: since we use cancel_after, + // this means a timeout. + // - Otherwise, respect the I/O error. + if ((cancel_state & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none) { + return io_ec ? io_ec : asio::error::operation_aborted; + } + return io_ec == asio::error::operation_aborted ? code_if_cancelled : io_ec; } connect_action connect_fsm::resume( @@ -45,15 +58,7 @@ connect_action connect_fsm::resume( asio::cancellation_type_t cancel_state) { // Translate error codes - if (is_terminal_cancellation(cancel_state)) { - if (ec) { - if (ec == asio::error::operation_aborted) { - ec = error::resolve_timeout; - } - } else { - ec = asio::error::operation_aborted; - } - } + ec = translate_timeout_error(ec, cancel_state, error::resolve_timeout); // Log it lgr_->on_resolve(ec, resolver_results); @@ -69,15 +74,7 @@ connect_action connect_fsm::resume( asio::cancellation_type_t cancel_state) { // Translate error codes - if (is_terminal_cancellation(cancel_state)) { - if (ec) { - if (ec == asio::error::operation_aborted) { - ec = error::connect_timeout; - } - } else { - ec = asio::error::operation_aborted; - } - } + ec = translate_timeout_error(ec, cancel_state, error::connect_timeout); // Log it lgr_->on_connect(ec, selected_endpoint); @@ -105,15 +102,7 @@ connect_action connect_fsm::resume( // it is because per-operation cancellation was activated. If we were not cancelled // but the operation failed with operation_aborted, it's a timeout. // Also check for cancellations that didn't cause a failure - if (is_terminal_cancellation(cancel_state)) { - if (ec) { - if (ec == asio::error::operation_aborted) { - ec = error::connect_timeout; - } - } else { - ec = asio::error::operation_aborted; - } - } + ec = translate_timeout_error(ec, cancel_state, error::connect_timeout); // Log it lgr_->on_connect(ec, cfg_->unix_socket); @@ -159,15 +148,7 @@ connect_action connect_fsm::resume( BOOST_REDIS_YIELD(resume_point_, 5, connect_action_type::ssl_handshake) // Translate error codes - if (is_terminal_cancellation(cancel_state)) { - if (ec) { - if (ec == asio::error::operation_aborted) { - ec = error::ssl_handshake_timeout; - } - } else { - ec = asio::error::operation_aborted; - } - } + ec = translate_timeout_error(ec, cancel_state, error::ssl_handshake_timeout); // Log it lgr_->on_ssl_handshake(ec); diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index bd564ca2..ac04e39a 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -10,6 +10,7 @@ #include +#include "boost/asio/error.hpp" #include "boost/asio/ip/tcp.hpp" #include "boost/redis/config.hpp" #include "boost/redis/detail/connection_logger.hpp" @@ -194,12 +195,37 @@ void test_tcp_resolve_error() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_tcp_resolve_timeout() +{ + // Setup + fixture fix; + + // Since we use cancel_after, a timeout is an operation_aborted without a cancellation state set + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume( + asio::error::operation_aborted, + resolver_results{}, + fix.st, + cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::resolve_timeout)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Error resolving the server hostname: Resolve timeout. [boost.redis:17]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + } // namespace int main() { test_tcp_success(); test_tcp_resolve_error(); + test_tcp_resolve_timeout(); return boost::report_errors(); } From c7b26ba8dbaffb7364b35aa7c470c4f090448b95 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 20:55:45 +0200 Subject: [PATCH 09/30] Resolve cancel --- test/test_connect_fsm.cpp | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index ac04e39a..ba73f094 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -219,6 +219,25 @@ void test_tcp_resolve_timeout() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_tcp_resolve_cancel() +{ + // Setup + fixture fix; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume( + asio::error::operation_aborted, + resolver_results{}, + fix.st, + cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging here is system-dependent, so we don't check the message + BOOST_TEST_EQ(fix.msgs.size(), 1u); +} + } // namespace int main() @@ -226,6 +245,7 @@ int main() test_tcp_success(); test_tcp_resolve_error(); test_tcp_resolve_timeout(); + test_tcp_resolve_cancel(); return boost::report_errors(); } From b61952e066868f136487a24b1e9e2ca750dc08b3 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 20:56:42 +0200 Subject: [PATCH 10/30] Resolve cancel edge --- test/test_connect_fsm.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index ba73f094..709a7a96 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -238,6 +238,21 @@ void test_tcp_resolve_cancel() BOOST_TEST_EQ(fix.msgs.size(), 1u); } +void test_tcp_resolve_cancel_edge() +{ + // Setup + fixture fix; + + // Cancel state set but no error + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_results{}, fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging here is system-dependent, so we don't check the message + BOOST_TEST_EQ(fix.msgs.size(), 1u); +} + } // namespace int main() @@ -246,6 +261,7 @@ int main() test_tcp_resolve_error(); test_tcp_resolve_timeout(); test_tcp_resolve_cancel(); + test_tcp_resolve_cancel_edge(); return boost::report_errors(); } From 3b3d64c5771d0b7bd00be11be618695f27bf86b7 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:02:22 +0200 Subject: [PATCH 11/30] connect error --- .../boost/redis/impl/connection_logger.ipp | 4 +-- test/test_connect_fsm.cpp | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index d0c6cf67..dce6c6ad 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -105,7 +105,7 @@ void connection_logger::on_connect(system::error_code const& ec, asio::ip::tcp:: return; if (ec) { - msg_ = "Failed connecting to the server: "; + msg_ = "Failed to connect to the server: "; format_error_code(ec, msg_); } else { msg_ = "Connected to "; @@ -121,7 +121,7 @@ void connection_logger::on_connect(system::error_code const& ec, std::string_vie return; if (ec) { - msg_ = "Failed connecting to the server: "; + msg_ = "Failed to connect to the server: "; format_error_code(ec, msg_); } else { msg_ = "Connected to "; diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 709a7a96..8ff05641 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -166,6 +166,7 @@ void test_tcp_success() // The transport type was appropriately set BOOST_TEST_EQ(fix.st.type, transport_type::tcp); + BOOST_TEST_NOT(fix.st.ssl_stream_used); // Check logging const log_message expected[] = { @@ -175,6 +176,7 @@ void test_tcp_success() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +// Resolve errors void test_tcp_resolve_error() { // Setup @@ -253,6 +255,30 @@ void test_tcp_resolve_cancel_edge() BOOST_TEST_EQ(fix.msgs.size(), 1u); } +// Connect errors +void test_tcp_connect_error() +{ + // Setup + fixture fix; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error::empty_field, tcp::endpoint{}, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::empty_field)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Failed to connect to the server: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + } // namespace int main() @@ -262,6 +288,7 @@ int main() test_tcp_resolve_timeout(); test_tcp_resolve_cancel(); test_tcp_resolve_cancel_edge(); + test_tcp_connect_error(); return boost::report_errors(); } From cd42edf60e349c11b621b9dff3bb8404fdd54e64 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:04:24 +0200 Subject: [PATCH 12/30] Connect timeout --- test/test_connect_fsm.cpp | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 8ff05641..005d0db7 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -279,6 +279,33 @@ void test_tcp_connect_error() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_tcp_connect_timeout() +{ + // Setup + fixture fix; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume( + asio::error::operation_aborted, + tcp::endpoint{}, + fix.st, + cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::connect_timeout)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Failed to connect to the server: Connect timeout. [boost.redis:18]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + } // namespace int main() @@ -289,6 +316,7 @@ int main() test_tcp_resolve_cancel(); test_tcp_resolve_cancel_edge(); test_tcp_connect_error(); + test_tcp_connect_timeout(); return boost::report_errors(); } From 79924f0cba8923c8a6a266563052eb4bac2d3efa Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:06:06 +0200 Subject: [PATCH 13/30] connect cancel --- test/test_connect_fsm.cpp | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 005d0db7..b6b22f8a 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -306,17 +306,41 @@ void test_tcp_connect_timeout() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_tcp_connect_cancel() +{ + // Setup + fixture fix; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume( + asio::error::operation_aborted, + tcp::endpoint{}, + fix.st, + cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging here is system-dependent, so we don't check the message + BOOST_TEST_EQ(fix.msgs.size(), 2u); +} + } // namespace int main() { test_tcp_success(); + test_tcp_resolve_error(); test_tcp_resolve_timeout(); test_tcp_resolve_cancel(); test_tcp_resolve_cancel_edge(); + test_tcp_connect_error(); test_tcp_connect_timeout(); + test_tcp_connect_cancel(); return boost::report_errors(); } From 4cc6f8fa303faaaf9db8d256be31db50af73c43a Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:06:56 +0200 Subject: [PATCH 14/30] cancel edge --- test/test_connect_fsm.cpp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index b6b22f8a..07e1a172 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -327,6 +327,23 @@ void test_tcp_connect_cancel() BOOST_TEST_EQ(fix.msgs.size(), 2u); } +void test_tcp_connect_cancel_edge() +{ + // Setup + fixture fix; + + // Run the algorithm. Cancellation state set but no error + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), tcp::endpoint{}, fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging here is system-dependent, so we don't check the message + BOOST_TEST_EQ(fix.msgs.size(), 2u); +} + } // namespace int main() @@ -341,6 +358,7 @@ int main() test_tcp_connect_error(); test_tcp_connect_timeout(); test_tcp_connect_cancel(); + test_tcp_connect_cancel_edge(); return boost::report_errors(); } From 1c5d45fa23ea489650343e330865872320675a23 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:13:21 +0200 Subject: [PATCH 15/30] TLS success --- .../boost/redis/impl/connection_logger.ipp | 8 +++-- test/test_connect_fsm.cpp | 35 +++++++++++++++++-- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index dce6c6ad..9e6b3a25 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -136,8 +136,12 @@ void connection_logger::on_ssl_handshake(system::error_code const& ec) if (logger_.lvl < logger::level::info) return; - msg_ = "SSL handshake: "; - format_error_code(ec, msg_); + if (ec) { + msg_ = "Failed to perform SSL handshake: "; + format_error_code(ec, msg_); + } else { + msg_ = "Successfully performed SSL handshake"; + } logger_.fn(logger::level::info, msg_); } diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 07e1a172..2cb86a2f 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -141,8 +141,8 @@ struct log_message { // Reduce duplication struct fixture { config cfg; - std::ostringstream oss; - std::vector msgs; + std::ostringstream oss{}; + std::vector msgs{}; detail::connection_logger lgr{ logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { msgs.push_back({lvl, std::string(msg)}); @@ -176,6 +176,36 @@ void test_tcp_success() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_tcp_tls_success() +{ + // Setup + config cfg; + cfg.use_ssl = true; + fixture fix{std::move(cfg)}; + + // Run the algorithm. No SSL stream reset is performed here + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::done); + + // The transport type was appropriately set + BOOST_TEST_EQ(fix.st.type, transport_type::tcp_tls); + BOOST_TEST(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Connected to 192.168.10.1:1234" }, + {logger::level::info, "Successfully performed SSL handshake" }, + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + // Resolve errors void test_tcp_resolve_error() { @@ -349,6 +379,7 @@ void test_tcp_connect_cancel_edge() int main() { test_tcp_success(); + test_tcp_tls_success(); test_tcp_resolve_error(); test_tcp_resolve_timeout(); From ef847efbe5140fa4750508d4a28dcec26635a9e2 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:17:01 +0200 Subject: [PATCH 16/30] TLS reconnect --- test/test_connect_fsm.cpp | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 2cb86a2f..2773f977 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -206,6 +206,39 @@ void test_tcp_tls_success() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_tcp_tls_success_reconnect() +{ + // Setup + config cfg; + cfg.use_ssl = true; + fixture fix{std::move(cfg)}; + fix.st.ssl_stream_used = true; + + // Run the algorithm. The stream is used, so it needs to be reset + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_stream_reset); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::done); + + // The transport type was appropriately set + BOOST_TEST_EQ(fix.st.type, transport_type::tcp_tls); + BOOST_TEST(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Connected to 192.168.10.1:1234" }, + {logger::level::info, "Successfully performed SSL handshake" }, + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + // Resolve errors void test_tcp_resolve_error() { @@ -380,6 +413,7 @@ int main() { test_tcp_success(); test_tcp_tls_success(); + test_tcp_tls_success_reconnect(); test_tcp_resolve_error(); test_tcp_resolve_timeout(); From 8417c91ac2d96f217a14432b0baee24e5ef362ff Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:20:01 +0200 Subject: [PATCH 17/30] ssl handshake error --- test/test_connect_fsm.cpp | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 2773f977..10af6afc 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -407,6 +407,38 @@ void test_tcp_connect_cancel_edge() BOOST_TEST_EQ(fix.msgs.size(), 2u); } +// SSL handshake error +void test_ssl_handshake_error() +{ + // Setup + config cfg; + cfg.use_ssl = true; + fixture fix{std::move(cfg)}; + + // Run the algorithm. No SSL stream reset is performed here + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(error::empty_field, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::empty_field)); + + // The stream is marked as used + BOOST_TEST(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Connected to 192.168.10.1:1234" }, + {logger::level::info, "Failed to perform SSL handshake: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + } // namespace int main() @@ -425,5 +457,7 @@ int main() test_tcp_connect_cancel(); test_tcp_connect_cancel_edge(); + test_ssl_handshake_error(); + return boost::report_errors(); } From 84bc7b7e52cdbda95bf7f956e12997ebb161ae95 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:21:49 +0200 Subject: [PATCH 18/30] SSL handshake timeout --- test/test_connect_fsm.cpp | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 10af6afc..89a2c1c4 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -439,6 +439,37 @@ void test_ssl_handshake_error() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_ssl_handshake_timeout() +{ + // Setup + config cfg; + cfg.use_ssl = true; + fixture fix{std::move(cfg)}; + + // Run the algorithm. Timeout = operation_aborted without the cancel type set + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::ssl_handshake_timeout)); + + // The stream is marked as used + BOOST_TEST(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Connected to 192.168.10.1:1234" }, + {logger::level::info, "Failed to perform SSL handshake: SSL handshake timeout. [boost.redis:20]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + } // namespace int main() @@ -458,6 +489,7 @@ int main() test_tcp_connect_cancel_edge(); test_ssl_handshake_error(); + test_ssl_handshake_timeout(); return boost::report_errors(); } From 0987223aeb8121c9a6ad506b27fc7a790a37834e Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:23:05 +0200 Subject: [PATCH 19/30] Handshake cancel --- test/test_connect_fsm.cpp | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 89a2c1c4..93366763 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -470,6 +470,30 @@ void test_ssl_handshake_timeout() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_ssl_handshake_cancel() +{ + // Setup + config cfg; + cfg.use_ssl = true; + fixture fix{std::move(cfg)}; + + // Run the algorithm. Cancel = operation_aborted with the cancel type set + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // The stream is marked as used + BOOST_TEST(fix.st.ssl_stream_used); + + // Logging is system-dependent, so we don't check messages + BOOST_TEST_EQ(fix.msgs.size(), 3u); +} + } // namespace int main() @@ -490,6 +514,7 @@ int main() test_ssl_handshake_error(); test_ssl_handshake_timeout(); + test_ssl_handshake_cancel(); return boost::report_errors(); } From 9eeadb7a4eb2f761f4c428a87ba69e720f853631 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:23:57 +0200 Subject: [PATCH 20/30] ssl cancel edge --- test/test_connect_fsm.cpp | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 93366763..78da591d 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -494,6 +494,30 @@ void test_ssl_handshake_cancel() BOOST_TEST_EQ(fix.msgs.size(), 3u); } +void test_ssl_handshake_cancel_edge() +{ + // Setup + config cfg; + cfg.use_ssl = true; + fixture fix{std::move(cfg)}; + + // Run the algorithm. No error, but the cancel state is set + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // The stream is marked as used + BOOST_TEST(fix.st.ssl_stream_used); + + // Logging is system-dependent, so we don't check messages + BOOST_TEST_EQ(fix.msgs.size(), 3u); +} + } // namespace int main() @@ -515,6 +539,7 @@ int main() test_ssl_handshake_error(); test_ssl_handshake_timeout(); test_ssl_handshake_cancel(); + test_ssl_handshake_cancel_edge(); return boost::report_errors(); } From a61db76393273e7a773d384d5ecb42afdab19120 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:27:52 +0200 Subject: [PATCH 21/30] UNIX success --- test/test_connect_fsm.cpp | 61 +++++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 78da591d..adb0184b 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -151,6 +151,20 @@ struct fixture { redis_stream_state st{}; }; +config make_ssl_config() +{ + config cfg; + cfg.use_ssl = true; + return cfg; +} + +config make_unix_config() +{ + config cfg; + cfg.unix_socket = "/run/redis.sock"; + return cfg; +} + void test_tcp_success() { // Setup @@ -179,9 +193,7 @@ void test_tcp_success() void test_tcp_tls_success() { // Setup - config cfg; - cfg.use_ssl = true; - fixture fix{std::move(cfg)}; + fixture fix{make_ssl_config()}; // Run the algorithm. No SSL stream reset is performed here auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); @@ -209,9 +221,7 @@ void test_tcp_tls_success() void test_tcp_tls_success_reconnect() { // Setup - config cfg; - cfg.use_ssl = true; - fixture fix{std::move(cfg)}; + fixture fix{make_ssl_config()}; fix.st.ssl_stream_used = true; // Run the algorithm. The stream is used, so it needs to be reset @@ -239,6 +249,28 @@ void test_tcp_tls_success_reconnect() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_unix_success() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::done); + + // The transport type was appropriately set + BOOST_TEST_EQ(fix.st.type, transport_type::unix_socket); + BOOST_TEST_NOT(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + {logger::level::info, "Connected to /run/redis.sock"}, + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + // Resolve errors void test_tcp_resolve_error() { @@ -411,9 +443,7 @@ void test_tcp_connect_cancel_edge() void test_ssl_handshake_error() { // Setup - config cfg; - cfg.use_ssl = true; - fixture fix{std::move(cfg)}; + fixture fix{make_ssl_config()}; // Run the algorithm. No SSL stream reset is performed here auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); @@ -442,9 +472,7 @@ void test_ssl_handshake_error() void test_ssl_handshake_timeout() { // Setup - config cfg; - cfg.use_ssl = true; - fixture fix{std::move(cfg)}; + fixture fix{make_ssl_config()}; // Run the algorithm. Timeout = operation_aborted without the cancel type set auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); @@ -473,9 +501,7 @@ void test_ssl_handshake_timeout() void test_ssl_handshake_cancel() { // Setup - config cfg; - cfg.use_ssl = true; - fixture fix{std::move(cfg)}; + fixture fix{make_ssl_config()}; // Run the algorithm. Cancel = operation_aborted with the cancel type set auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); @@ -497,9 +523,7 @@ void test_ssl_handshake_cancel() void test_ssl_handshake_cancel_edge() { // Setup - config cfg; - cfg.use_ssl = true; - fixture fix{std::move(cfg)}; + fixture fix{make_ssl_config()}; // Run the algorithm. No error, but the cancel state is set auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); @@ -525,6 +549,7 @@ int main() test_tcp_success(); test_tcp_tls_success(); test_tcp_tls_success_reconnect(); + test_unix_success(); test_tcp_resolve_error(); test_tcp_resolve_timeout(); From 9846bccc4950cee9674f8010589f47b4ab9583e2 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:30:16 +0200 Subject: [PATCH 22/30] connect error --- test/test_connect_fsm.cpp | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index adb0184b..809ae658 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -14,6 +14,7 @@ #include "boost/asio/ip/tcp.hpp" #include "boost/redis/config.hpp" #include "boost/redis/detail/connection_logger.hpp" +#include "boost/redis/error.hpp" #include "boost/redis/logger.hpp" #include "boost/system/detail/error_code.hpp" @@ -542,6 +543,27 @@ void test_ssl_handshake_cancel_edge() BOOST_TEST_EQ(fix.msgs.size(), 3u); } +// UNIX connect errors +void test_unix_connect_error() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(error::empty_field, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::empty_field)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Failed to connect to the server: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + } // namespace int main() @@ -566,5 +588,7 @@ int main() test_ssl_handshake_cancel(); test_ssl_handshake_cancel_edge(); + test_unix_connect_error(); + return boost::report_errors(); } From 3ab228b3c65ac232a9324d74d24a96906d32a59f Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:31:35 +0200 Subject: [PATCH 23/30] connect timeout --- test/test_connect_fsm.cpp | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 809ae658..f46e4291 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -564,6 +564,26 @@ void test_unix_connect_error() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_unix_connect_timeout() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm. Timeout = operation_aborted without a cancel state + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::connect_timeout)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Failed to connect to the server: Connect timeout. [boost.redis:18]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + } // namespace int main() @@ -589,6 +609,7 @@ int main() test_ssl_handshake_cancel_edge(); test_unix_connect_error(); + test_unix_connect_timeout(); return boost::report_errors(); } From a7a26133f54fedf6c1e2fdee922db96c8a5e8267 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:32:39 +0200 Subject: [PATCH 24/30] UNIX connect cancel --- test/test_connect_fsm.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index f46e4291..723e017a 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -584,6 +584,21 @@ void test_unix_connect_timeout() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +void test_unix_connect_cancel() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm. Cancel = operation_aborted with a cancel state + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging is system-dependent + BOOST_TEST_EQ(fix.msgs.size(), 1u); +} + } // namespace int main() @@ -610,6 +625,7 @@ int main() test_unix_connect_error(); test_unix_connect_timeout(); + test_unix_connect_cancel(); return boost::report_errors(); } From a39c46a71621b3e21864cd512ace0f23f5cddcf1 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:33:23 +0200 Subject: [PATCH 25/30] unix edge --- test/test_connect_fsm.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 723e017a..a7f07d91 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -599,6 +599,21 @@ void test_unix_connect_cancel() BOOST_TEST_EQ(fix.msgs.size(), 1u); } +void test_unix_connect_cancel_edge() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm. No error, but cancel state is set + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging is system-dependent + BOOST_TEST_EQ(fix.msgs.size(), 1u); +} + } // namespace int main() @@ -626,6 +641,7 @@ int main() test_unix_connect_error(); test_unix_connect_timeout(); test_unix_connect_cancel(); + test_unix_connect_cancel_edge(); return boost::report_errors(); } From 5926dfae88d5f22bb180c78cdfc84030ba70ef2c Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Mon, 29 Sep 2025 21:33:54 +0200 Subject: [PATCH 26/30] Fix includes --- test/test_connect_fsm.cpp | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index a7f07d91..9609de87 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -6,18 +6,16 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // +#include #include +#include +#include +#include +#include +#include #include -#include "boost/asio/error.hpp" -#include "boost/asio/ip/tcp.hpp" -#include "boost/redis/config.hpp" -#include "boost/redis/detail/connection_logger.hpp" -#include "boost/redis/error.hpp" -#include "boost/redis/logger.hpp" -#include "boost/system/detail/error_code.hpp" - #include #include #include From 769a01ab019997aa4940654b60b86e770ddb930b Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 13:39:52 +0200 Subject: [PATCH 27/30] UNIX socket close --- include/boost/redis/detail/connect_fsm.hpp | 13 +++++++------ include/boost/redis/detail/redis_stream.hpp | 11 +++++++++++ include/boost/redis/impl/connect_fsm.ipp | 19 ++++++++++++------- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/include/boost/redis/detail/connect_fsm.hpp b/include/boost/redis/detail/connect_fsm.hpp index d1d8a556..febc3abc 100644 --- a/include/boost/redis/detail/connect_fsm.hpp +++ b/include/boost/redis/detail/connect_fsm.hpp @@ -41,12 +41,13 @@ struct redis_stream_state { // What should we do next? enum class connect_action_type { - unix_socket_connect, - tcp_resolve, - tcp_connect, - ssl_stream_reset, - ssl_handshake, - done, + unix_socket_close, // Close the UNIX socket, to discard state + unix_socket_connect, // Connect to the UNIX socket + tcp_resolve, // Name resolution + tcp_connect, // TCP connect + ssl_stream_reset, // Re-create the SSL stream, to discard state + ssl_handshake, // SSL handshake + done, // Complete the async op }; class connect_action { diff --git a/include/boost/redis/detail/redis_stream.hpp b/include/boost/redis/detail/redis_stream.hpp index a2171ef7..d3cba672 100644 --- a/include/boost/redis/detail/redis_stream.hpp +++ b/include/boost/redis/detail/redis_stream.hpp @@ -55,6 +55,17 @@ class redis_stream { const auto& cfg = fsm_.get_config(); switch (act.type()) { + case connect_action_type::unix_socket_close: +#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS + { + system::error_code ec; + obj.unix_socket_.close(ec); + (*this)(self, ec); // This is a sync action + } +#else + BOOST_ASSERT(false); +#endif + return; case connect_action_type::unix_socket_connect: #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS obj.unix_socket_.async_connect( diff --git a/include/boost/redis/impl/connect_fsm.ipp b/include/boost/redis/impl/connect_fsm.ipp index e728d11e..ff8cc34c 100644 --- a/include/boost/redis/impl/connect_fsm.ipp +++ b/include/boost/redis/impl/connect_fsm.ipp @@ -95,8 +95,11 @@ connect_action connect_fsm::resume( st.type = transport_from_config(*cfg_); if (st.type == transport_type::unix_socket) { - // Directly connect to the socket - BOOST_REDIS_YIELD(resume_point_, 1, connect_action_type::unix_socket_connect) + // Reset the socket, to discard any previous state. Ignore any errors + BOOST_REDIS_YIELD(resume_point_, 1, connect_action_type::unix_socket_close) + + // Connect to the socket + BOOST_REDIS_YIELD(resume_point_, 2, connect_action_type::unix_socket_connect) // Fix error codes. If we were cancelled and the code is operation_aborted, // it is because per-operation cancellation was activated. If we were not cancelled @@ -117,14 +120,16 @@ connect_action connect_fsm::resume( } else { // ssl::stream doesn't support being re-used. If we're to use // TLS and the stream has been used, re-create it. - // Must be done before anything else is done on the stream + // Must be done before anything else is done on the stream. + // We don't need to close the TCP socket if using plaintext TCP + // because range-connect closes open sockets, while individual connect doesn't if (cfg_->use_ssl && st.ssl_stream_used) { - BOOST_REDIS_YIELD(resume_point_, 2, connect_action_type::ssl_stream_reset) + BOOST_REDIS_YIELD(resume_point_, 3, connect_action_type::ssl_stream_reset) } // Resolve names. The continuation needs access to the returned // endpoints, and is a specialized resume() that will call this function - BOOST_REDIS_YIELD(resume_point_, 3, connect_action_type::tcp_resolve) + BOOST_REDIS_YIELD(resume_point_, 4, connect_action_type::tcp_resolve) // If this failed, we can't continue (error code translation already performed here) if (ec) { @@ -133,7 +138,7 @@ connect_action connect_fsm::resume( // Now connect to the endpoints returned by the resolver. // This has a specialized resume(), too - BOOST_REDIS_YIELD(resume_point_, 4, connect_action_type::tcp_connect) + BOOST_REDIS_YIELD(resume_point_, 5, connect_action_type::tcp_connect) // If this failed, we can't continue (error code translation already performed here) if (ec) { @@ -145,7 +150,7 @@ connect_action connect_fsm::resume( st.ssl_stream_used = true; // Perform the TLS handshake - BOOST_REDIS_YIELD(resume_point_, 5, connect_action_type::ssl_handshake) + BOOST_REDIS_YIELD(resume_point_, 6, connect_action_type::ssl_handshake) // Translate error codes ec = translate_timeout_error(ec, cancel_state, error::ssl_handshake_timeout); From dfdb74a843a92c17fa7a4b6b44d44f726c6ce270 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 13:42:46 +0200 Subject: [PATCH 28/30] Fix unit tests --- test/test_connect_fsm.cpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 9609de87..4da8aafa 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -39,6 +39,7 @@ using resolver_results = tcp::resolver::results_type; static const char* to_string(connect_action_type type) { switch (type) { + case connect_action_type::unix_socket_close: return "connect_action_type::unix_socket_close"; case connect_action_type::unix_socket_connect: return "connect_action_type::unix_socket_connect"; case connect_action_type::tcp_resolve: return "connect_action_type::tcp_resolve"; @@ -255,6 +256,8 @@ void test_unix_success() // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::done); @@ -549,6 +552,8 @@ void test_unix_connect_error() // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(error::empty_field, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::empty_field)); @@ -569,6 +574,8 @@ void test_unix_connect_timeout() // Run the algorithm. Timeout = operation_aborted without a cancel state auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::connect_timeout)); @@ -589,6 +596,8 @@ void test_unix_connect_cancel() // Run the algorithm. Cancel = operation_aborted with a cancel state auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); @@ -604,6 +613,8 @@ void test_unix_connect_cancel_edge() // Run the algorithm. No error, but cancel state is set auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); From fbbb673a10d7f9db2da3277d65d2cf5fa2c39745 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 13:43:59 +0200 Subject: [PATCH 29/30] close error test --- test/test_connect_fsm.cpp | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 4da8aafa..9a843035 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -273,6 +273,31 @@ void test_unix_success() BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); } +// Close errors are ignored +void test_unix_success_close_error() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(asio::error::bad_descriptor, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::done); + + // The transport type was appropriately set + BOOST_TEST_EQ(fix.st.type, transport_type::unix_socket); + BOOST_TEST_NOT(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + {logger::level::info, "Connected to /run/redis.sock"}, + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + // Resolve errors void test_tcp_resolve_error() { @@ -631,6 +656,7 @@ int main() test_tcp_tls_success(); test_tcp_tls_success_reconnect(); test_unix_success(); + test_unix_success_close_error(); test_tcp_resolve_error(); test_tcp_resolve_timeout(); From 7a136b045b1855f027f073a25c8d72013d77bfd6 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 19:09:37 +0200 Subject: [PATCH 30/30] Simplification --- include/boost/redis/detail/connect_fsm.hpp | 24 +++++++-------------- include/boost/redis/detail/redis_stream.hpp | 6 +++--- test/test_connect_fsm.cpp | 8 +++---- 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/include/boost/redis/detail/connect_fsm.hpp b/include/boost/redis/detail/connect_fsm.hpp index febc3abc..f1d52bbc 100644 --- a/include/boost/redis/detail/connect_fsm.hpp +++ b/include/boost/redis/detail/connect_fsm.hpp @@ -9,18 +9,14 @@ #ifndef BOOST_REDIS_CONNECT_FSM_HPP #define BOOST_REDIS_CONNECT_FSM_HPP +#include + #include #include #include // Sans-io algorithm for redis_stream::async_connect, as a finite state machine -namespace boost::redis { - -struct config; - -} - namespace boost::redis::detail { class connection_logger; @@ -50,22 +46,18 @@ enum class connect_action_type done, // Complete the async op }; -class connect_action { - connect_action_type type_; - system::error_code ec_; +struct connect_action { + connect_action_type type; + system::error_code ec; -public: connect_action(connect_action_type type) noexcept - : type_{type} + : type{type} { } connect_action(system::error_code ec) noexcept - : type_{connect_action_type::done} - , ec_{ec} + : type{connect_action_type::done} + , ec{ec} { } - - connect_action_type type() const { return type_; } - system::error_code error() const { return ec_; } }; class connect_fsm { diff --git a/include/boost/redis/detail/redis_stream.hpp b/include/boost/redis/detail/redis_stream.hpp index 2a2100e2..9abf243a 100644 --- a/include/boost/redis/detail/redis_stream.hpp +++ b/include/boost/redis/detail/redis_stream.hpp @@ -54,7 +54,7 @@ class redis_stream { { const auto& cfg = fsm_.get_config(); - switch (act.type()) { + switch (act.type) { case connect_action_type::unix_socket_close: #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS { @@ -92,7 +92,7 @@ class redis_stream { asio::ssl::stream_base::client, asio::cancel_after(obj.timer_, cfg.ssl_handshake_timeout, std::move(self))); return; - case connect_action_type::done: self.complete(act.error()); break; + case connect_action_type::done: self.complete(act.ec); break; // Connect should use the specialized handler, where resolver results are available case connect_action_type::tcp_connect: default: BOOST_ASSERT(false); @@ -122,7 +122,7 @@ class redis_stream { asio::ip::tcp::resolver::results_type endpoints) { auto act = fsm_.resume(ec, endpoints, obj.st_, self.get_cancellation_state().cancelled()); - if (act.type() == connect_action_type::tcp_connect) { + if (act.type == connect_action_type::tcp_connect) { asio::async_connect( obj.stream_.next_layer(), std::move(endpoints), diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 9a843035..75a259b8 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -94,14 +94,14 @@ std::ostream& operator<<(std::ostream& os, transport_type type) { return os << t bool operator==(const connect_action& lhs, const connect_action& rhs) noexcept { - return lhs.type() == rhs.type() && lhs.error() == rhs.error(); + return lhs.type == rhs.type && lhs.ec == rhs.ec; } std::ostream& operator<<(std::ostream& os, const connect_action& act) { - os << "connect_action{ .type=" << act.type(); - if (act.type() == connect_action_type::done) - os << ", .error=" << act.error(); + os << "connect_action{ .type=" << act.type; + if (act.type == connect_action_type::done) + os << ", .error=" << act.ec; return os << " }"; }