diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 3d38b255..5695166d 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -531,7 +531,7 @@ class run_op { for (;;) { // Try to connect BOOST_ASIO_CORO_YIELD - conn_->stream_.async_connect(&conn_->cfg_, &conn_->logger_, std::move(self)); + conn_->stream_.async_connect(conn_->cfg_, conn_->logger_, std::move(self)); // Check for cancellations if (is_cancelled(self)) { diff --git a/include/boost/redis/detail/connect_fsm.hpp b/include/boost/redis/detail/connect_fsm.hpp new file mode 100644 index 00000000..f1d52bbc --- /dev/null +++ b/include/boost/redis/detail/connect_fsm.hpp @@ -0,0 +1,97 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_CONNECT_FSM_HPP +#define BOOST_REDIS_CONNECT_FSM_HPP + +#include + +#include +#include +#include + +// Sans-io algorithm for redis_stream::async_connect, as a finite state machine + +namespace boost::redis::detail { + +class connection_logger; + +// What transport is redis_stream using? +enum class transport_type +{ + tcp, // plaintext TCP + tcp_tls, // TLS over TCP + unix_socket, // UNIX domain sockets +}; + +struct redis_stream_state { + transport_type type{transport_type::tcp}; + bool ssl_stream_used{false}; +}; + +// What should we do next? +enum class connect_action_type +{ + unix_socket_close, // Close the UNIX socket, to discard state + unix_socket_connect, // Connect to the UNIX socket + tcp_resolve, // Name resolution + tcp_connect, // TCP connect + ssl_stream_reset, // Re-create the SSL stream, to discard state + ssl_handshake, // SSL handshake + done, // Complete the async op +}; + +struct connect_action { + connect_action_type type; + system::error_code ec; + + connect_action(connect_action_type type) noexcept + : type{type} + { } + + connect_action(system::error_code ec) noexcept + : type{connect_action_type::done} + , ec{ec} + { } +}; + +class connect_fsm { + int resume_point_{0}; + const config* cfg_{nullptr}; + connection_logger* lgr_{nullptr}; + +public: + connect_fsm(const config& cfg, connection_logger& lgr) noexcept + : cfg_(&cfg) + , lgr_(&lgr) + { } + + const config& get_config() const { return *cfg_; } + + connect_action resume( + system::error_code ec, + const asio::ip::tcp::resolver::results_type& resolver_results, + redis_stream_state& st, + asio::cancellation_type_t cancel_state); + + connect_action resume( + system::error_code ec, + const asio::ip::tcp::endpoint& selected_endpoint, + redis_stream_state& st, + asio::cancellation_type_t cancel_state); + + connect_action resume( + system::error_code ec, + redis_stream_state& st, + asio::cancellation_type_t cancel_state); + +}; // namespace boost::redis::detail + +} // namespace boost::redis::detail + +#endif diff --git a/include/boost/redis/detail/redis_stream.hpp b/include/boost/redis/detail/redis_stream.hpp index 670fe114..9abf243a 100644 --- a/include/boost/redis/detail/redis_stream.hpp +++ b/include/boost/redis/detail/redis_stream.hpp @@ -8,6 +8,7 @@ #define BOOST_REDIS_REDIS_STREAM_HPP #include +#include #include #include @@ -31,14 +32,6 @@ namespace boost { namespace redis { namespace detail { -// What transport is redis_stream using? -enum class transport_type -{ - tcp, // plaintext TCP - tcp_tls, // TLS over TCP - unix_socket, // UNIX domain sockets -}; - template class redis_stream { asio::ssl::context ssl_ctx_; @@ -48,140 +41,103 @@ class redis_stream { asio::basic_stream_socket unix_socket_; #endif typename asio::steady_timer::template rebind_executor::other timer_; - - transport_type transport_{transport_type::tcp}; - bool ssl_stream_used_{false}; + redis_stream_state st_; void reset_stream() { stream_ = {resolv_.get_executor(), ssl_ctx_}; } - static transport_type transport_from_config(const config& cfg) - { - if (cfg.unix_socket.empty()) { - if (cfg.use_ssl) { - return transport_type::tcp_tls; - } else { - return transport_type::tcp; - } - } else { - BOOST_ASSERT(!cfg.use_ssl); - return transport_type::unix_socket; - } - } - struct connect_op { redis_stream& obj; - const config* cfg; - connection_logger* lgr; - asio::coroutine coro{}; - - // This overload will be used for connects. We only need the endpoint - // for logging, so log it and call the coroutine - template - void operator()( - Self& self, - system::error_code ec, - const asio::ip::tcp::endpoint& selected_endpoint) - { - lgr->on_connect(ec, selected_endpoint); - (*this)(self, ec); - } + connect_fsm fsm_; template - void operator()( - Self& self, - system::error_code ec = {}, - asio::ip::tcp::resolver::results_type resolver_results = {}) + void execute_action(Self& self, connect_action act) { - BOOST_ASIO_CORO_REENTER(coro) - { - // Record the transport that we will be using - obj.transport_ = transport_from_config(*cfg); + const auto& cfg = fsm_.get_config(); - if (obj.transport_ == transport_type::unix_socket) { + switch (act.type) { + case connect_action_type::unix_socket_close: #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS - // Discard any existing state + { + system::error_code ec; obj.unix_socket_.close(ec); - - // Directly connect to the socket - BOOST_ASIO_CORO_YIELD + (*this)(self, ec); // This is a sync action + } +#else + BOOST_ASSERT(false); +#endif + return; + case connect_action_type::unix_socket_connect: +#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS obj.unix_socket_.async_connect( - cfg->unix_socket, - asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self))); - - // Log it - lgr->on_connect(ec, cfg->unix_socket); - - // If this failed, we can't continue - if (ec) { - self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec); - return; - } + cfg.unix_socket, + asio::cancel_after(obj.timer_, cfg.connect_timeout, std::move(self))); #else BOOST_ASSERT(false); #endif - } else { - // ssl::stream doesn't support being re-used. If we're to use - // TLS and the stream has been used, re-create it. - // Must be done before anything else is done on the stream. - // Note that we don't need to close the socket here because - // range connect does it for us. - if (cfg->use_ssl && obj.ssl_stream_used_) - obj.reset_stream(); + return; - BOOST_ASIO_CORO_YIELD + case connect_action_type::tcp_resolve: obj.resolv_.async_resolve( - cfg->addr.host, - cfg->addr.port, - asio::cancel_after(obj.timer_, cfg->resolve_timeout, std::move(self))); - - // Log it - lgr->on_resolve(ec, resolver_results); - - // If this failed, we can't continue - if (ec) { - self.complete(ec == asio::error::operation_aborted ? error::resolve_timeout : ec); - return; - } - - // Connect to the address that the resolver provided us - BOOST_ASIO_CORO_YIELD - asio::async_connect( - obj.stream_.next_layer(), - std::move(resolver_results), - asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self))); - - // Note: logging is performed in the specialized operator() function. - // If this failed, we can't continue - if (ec) { - self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec); - return; - } - - if (cfg->use_ssl) { - // Mark the SSL stream as used - obj.ssl_stream_used_ = true; - - // If we were configured to use TLS, perform the handshake - BOOST_ASIO_CORO_YIELD - obj.stream_.async_handshake( - asio::ssl::stream_base::client, - asio::cancel_after(obj.timer_, cfg->ssl_handshake_timeout, std::move(self))); - - lgr->on_ssl_handshake(ec); + cfg.addr.host, + cfg.addr.port, + asio::cancel_after(obj.timer_, cfg.resolve_timeout, std::move(self))); + return; + case connect_action_type::ssl_stream_reset: + obj.reset_stream(); + // this action does not require yielding. Execute the next action immediately + (*this)(self); + return; + case connect_action_type::ssl_handshake: + obj.stream_.async_handshake( + asio::ssl::stream_base::client, + asio::cancel_after(obj.timer_, cfg.ssl_handshake_timeout, std::move(self))); + return; + case connect_action_type::done: self.complete(act.ec); break; + // Connect should use the specialized handler, where resolver results are available + case connect_action_type::tcp_connect: + default: BOOST_ASSERT(false); + } + } - // If this failed, we can't continue - if (ec) { - self.complete( - ec == asio::error::operation_aborted ? error::ssl_handshake_timeout : ec); - return; - } - } - } + // This overload will be used for connects + template + void operator()( + Self& self, + system::error_code ec, + const asio::ip::tcp::endpoint& selected_endpoint) + { + auto act = fsm_.resume( + ec, + selected_endpoint, + obj.st_, + self.get_cancellation_state().cancelled()); + execute_action(self, act); + } - // Done - self.complete(system::error_code()); + // This overload will be used for resolves + template + void operator()( + Self& self, + system::error_code ec, + asio::ip::tcp::resolver::results_type endpoints) + { + auto act = fsm_.resume(ec, endpoints, obj.st_, self.get_cancellation_state().cancelled()); + if (act.type == connect_action_type::tcp_connect) { + asio::async_connect( + obj.stream_.next_layer(), + std::move(endpoints), + asio::cancel_after(obj.timer_, fsm_.get_config().connect_timeout, std::move(self))); + } else { + execute_action(self, act); } } + + template + void operator()(Self& self, system::error_code ec = {}) + { + auto act = fsm_.resume(ec, obj.st_, self.get_cancellation_state().cancelled()); + execute_action(self, act); + } }; public: @@ -204,7 +160,7 @@ class redis_stream { bool is_open() const { #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS - if (transport_ == transport_type::unix_socket) + if (st_.type == transport_type::unix_socket) return unix_socket_.is_open(); #endif return stream_.next_layer().is_open(); @@ -214,10 +170,10 @@ class redis_stream { // I/O template - auto async_connect(const config* cfg, connection_logger* l, CompletionToken&& token) + auto async_connect(const config& cfg, connection_logger& l, CompletionToken&& token) { return asio::async_compose( - connect_op{*this, cfg, l}, + connect_op{*this, connect_fsm(cfg, l)}, token); } @@ -225,7 +181,7 @@ class redis_stream { template void async_write_some(const ConstBufferSequence& buffers, CompletionToken&& token) { - switch (transport_) { + switch (st_.type) { case transport_type::tcp: { stream_.next_layer().async_write_some(buffers, std::forward(token)); @@ -250,7 +206,7 @@ class redis_stream { template void async_read_some(const MutableBufferSequence& buffers, CompletionToken&& token) { - switch (transport_) { + switch (st_.type) { case transport_type::tcp: { return stream_.next_layer().async_read_some( diff --git a/include/boost/redis/impl/connect_fsm.ipp b/include/boost/redis/impl/connect_fsm.ipp new file mode 100644 index 00000000..ff8cc34c --- /dev/null +++ b/include/boost/redis/impl/connect_fsm.ipp @@ -0,0 +1,176 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace boost::redis::detail { + +inline transport_type transport_from_config(const config& cfg) +{ + if (cfg.unix_socket.empty()) { + if (cfg.use_ssl) { + return transport_type::tcp_tls; + } else { + return transport_type::tcp; + } + } else { + BOOST_ASSERT(!cfg.use_ssl); + return transport_type::unix_socket; + } +} + +inline system::error_code translate_timeout_error( + system::error_code io_ec, + asio::cancellation_type_t cancel_state, + error code_if_cancelled) +{ + // Translates cancellations and timeout errors into a single error_code. + // - Cancellation state set, and an I/O error: the entire operation was cancelled. + // The I/O code (probably operation_aborted) is appropriate. + // - Cancellation state set, and no I/O error: same as above, but the cancellation + // arrived after the operation completed and before the handler was called. Set the code here. + // - No cancellation state set, I/O error set to operation_aborted: since we use cancel_after, + // this means a timeout. + // - Otherwise, respect the I/O error. + if ((cancel_state & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none) { + return io_ec ? io_ec : asio::error::operation_aborted; + } + return io_ec == asio::error::operation_aborted ? code_if_cancelled : io_ec; +} + +connect_action connect_fsm::resume( + system::error_code ec, + const asio::ip::tcp::resolver::results_type& resolver_results, + redis_stream_state& st, + asio::cancellation_type_t cancel_state) +{ + // Translate error codes + ec = translate_timeout_error(ec, cancel_state, error::resolve_timeout); + + // Log it + lgr_->on_resolve(ec, resolver_results); + + // Delegate to the regular resume function + return resume(ec, st, cancel_state); +} + +connect_action connect_fsm::resume( + system::error_code ec, + const asio::ip::tcp::endpoint& selected_endpoint, + redis_stream_state& st, + asio::cancellation_type_t cancel_state) +{ + // Translate error codes + ec = translate_timeout_error(ec, cancel_state, error::connect_timeout); + + // Log it + lgr_->on_connect(ec, selected_endpoint); + + // Delegate to the regular resume function + return resume(ec, st, cancel_state); +} + +connect_action connect_fsm::resume( + system::error_code ec, + redis_stream_state& st, + asio::cancellation_type_t cancel_state) +{ + switch (resume_point_) { + BOOST_REDIS_CORO_INITIAL + + // Record the transport that we will be using + st.type = transport_from_config(*cfg_); + + if (st.type == transport_type::unix_socket) { + // Reset the socket, to discard any previous state. Ignore any errors + BOOST_REDIS_YIELD(resume_point_, 1, connect_action_type::unix_socket_close) + + // Connect to the socket + BOOST_REDIS_YIELD(resume_point_, 2, connect_action_type::unix_socket_connect) + + // Fix error codes. If we were cancelled and the code is operation_aborted, + // it is because per-operation cancellation was activated. If we were not cancelled + // but the operation failed with operation_aborted, it's a timeout. + // Also check for cancellations that didn't cause a failure + ec = translate_timeout_error(ec, cancel_state, error::connect_timeout); + + // Log it + lgr_->on_connect(ec, cfg_->unix_socket); + + // If this failed, we can't continue + if (ec) { + return ec; + } + + // Done + return system::error_code(); + } else { + // ssl::stream doesn't support being re-used. If we're to use + // TLS and the stream has been used, re-create it. + // Must be done before anything else is done on the stream. + // We don't need to close the TCP socket if using plaintext TCP + // because range-connect closes open sockets, while individual connect doesn't + if (cfg_->use_ssl && st.ssl_stream_used) { + BOOST_REDIS_YIELD(resume_point_, 3, connect_action_type::ssl_stream_reset) + } + + // Resolve names. The continuation needs access to the returned + // endpoints, and is a specialized resume() that will call this function + BOOST_REDIS_YIELD(resume_point_, 4, connect_action_type::tcp_resolve) + + // If this failed, we can't continue (error code translation already performed here) + if (ec) { + return ec; + } + + // Now connect to the endpoints returned by the resolver. + // This has a specialized resume(), too + BOOST_REDIS_YIELD(resume_point_, 5, connect_action_type::tcp_connect) + + // If this failed, we can't continue (error code translation already performed here) + if (ec) { + return ec; + } + + if (cfg_->use_ssl) { + // Mark the SSL stream as used + st.ssl_stream_used = true; + + // Perform the TLS handshake + BOOST_REDIS_YIELD(resume_point_, 6, connect_action_type::ssl_handshake) + + // Translate error codes + ec = translate_timeout_error(ec, cancel_state, error::ssl_handshake_timeout); + + // Log it + lgr_->on_ssl_handshake(ec); + + // If this failed, we can't continue + if (ec) { + return ec; + } + } + + // Done + return system::error_code(); + } + } + + BOOST_ASSERT(false); + return system::error_code(); +} + +} // namespace boost::redis::detail diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index bdb72b6b..5f97bffb 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -103,7 +103,7 @@ void connection_logger::on_connect(system::error_code const& ec, asio::ip::tcp:: return; if (ec) { - msg_ = "Failed connecting to the server: "; + msg_ = "Failed to connect to the server: "; format_error_code(ec, msg_); } else { msg_ = "Connected to "; @@ -119,7 +119,7 @@ void connection_logger::on_connect(system::error_code const& ec, std::string_vie return; if (ec) { - msg_ = "Failed connecting to the server: "; + msg_ = "Failed to connect to the server: "; format_error_code(ec, msg_); } else { msg_ = "Connected to "; @@ -134,8 +134,12 @@ void connection_logger::on_ssl_handshake(system::error_code const& ec) if (logger_.lvl < logger::level::info) return; - msg_ = "SSL handshake: "; - format_error_code(ec, msg_); + if (ec) { + msg_ = "Failed to perform SSL handshake: "; + format_error_code(ec, msg_); + } else { + msg_ = "Successfully performed SSL handshake"; + } logger_.fn(logger::level::info, msg_); } diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 3e57518e..b47fb22a 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e5d4e24c..d21279f3 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -41,6 +41,7 @@ make_test(test_exec_fsm) make_test(test_log_to_file) make_test(test_conn_logging) make_test(test_reader_fsm) +make_test(test_connect_fsm) make_test(test_setup_request_utils) make_test(test_multiplexer) diff --git a/test/Jamfile b/test/Jamfile index 100d8d2d..43b26625 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -58,6 +58,7 @@ local tests = test_log_to_file test_conn_logging test_reader_fsm + test_connect_fsm test_setup_request_utils test_multiplexer ; diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp new file mode 100644 index 00000000..75a259b8 --- /dev/null +++ b/test/test_connect_fsm.cpp @@ -0,0 +1,682 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace boost::redis; +namespace asio = boost::asio; +using detail::connect_fsm; +using detail::connect_action_type; +using detail::connect_action; +using detail::connection_logger; +using detail::redis_stream_state; +using detail::transport_type; +using asio::ip::tcp; +using boost::system::error_code; +using boost::asio::cancellation_type_t; +using resolver_results = tcp::resolver::results_type; + +// Operators +static const char* to_string(connect_action_type type) +{ + switch (type) { + case connect_action_type::unix_socket_close: return "connect_action_type::unix_socket_close"; + case connect_action_type::unix_socket_connect: + return "connect_action_type::unix_socket_connect"; + case connect_action_type::tcp_resolve: return "connect_action_type::tcp_resolve"; + case connect_action_type::tcp_connect: return "connect_action_type::tcp_connect"; + case connect_action_type::ssl_stream_reset: return "connect_action_type::ssl_stream_reset"; + case connect_action_type::ssl_handshake: return "connect_action_type::ssl_handshake"; + case connect_action_type::done: return "connect_action_type::done"; + default: return ""; + } +} + +static const char* to_string(transport_type type) +{ + switch (type) { + case transport_type::tcp: return "transport_type::tcp"; + case transport_type::tcp_tls: return "transport_type::tcp_tls"; + case transport_type::unix_socket: return "transport_type::unix_socket"; + default: return ""; + } +} + +static const char* to_string(logger::level lvl) +{ + switch (lvl) { + case logger::level::disabled: return "logger::level::disabled"; + case logger::level::emerg: return "logger::level::emerg"; + case logger::level::alert: return "logger::level::alert"; + case logger::level::crit: return "logger::level::crit"; + case logger::level::err: return "logger::level::err"; + case logger::level::warning: return "logger::level::warning"; + case logger::level::notice: return "logger::level::notice"; + case logger::level::info: return "logger::level::info"; + case logger::level::debug: return "logger::level::debug"; + default: return ""; + } +} + +namespace boost::redis { + +std::ostream& operator<<(std::ostream& os, logger::level lvl) { return os << to_string(lvl); } + +} // namespace boost::redis + +namespace boost::redis::detail { + +std::ostream& operator<<(std::ostream& os, connect_action_type type) +{ + return os << to_string(type); +} + +std::ostream& operator<<(std::ostream& os, transport_type type) { return os << to_string(type); } + +bool operator==(const connect_action& lhs, const connect_action& rhs) noexcept +{ + return lhs.type == rhs.type && lhs.ec == rhs.ec; +} + +std::ostream& operator<<(std::ostream& os, const connect_action& act) +{ + os << "connect_action{ .type=" << act.type; + if (act.type == connect_action_type::done) + os << ", .error=" << act.ec; + return os << " }"; +} + +} // namespace boost::redis::detail + +namespace { + +// TCP endpoints +const tcp::endpoint endpoint(asio::ip::make_address("192.168.10.1"), 1234); +const tcp::endpoint endpoint2(asio::ip::make_address("192.168.10.2"), 1235); + +auto resolver_data = [] { + const tcp::endpoint data[] = {endpoint, endpoint2}; + return asio::ip::tcp::resolver::results_type::create( + std::begin(data), + std::end(data), + "my_host", + "1234"); +}(); + +// For checking logs +struct log_message { + logger::level lvl; + std::string msg; + + friend bool operator==(const log_message& lhs, const log_message& rhs) noexcept + { + return lhs.lvl == rhs.lvl && lhs.msg == rhs.msg; + } + + friend std::ostream& operator<<(std::ostream& os, const log_message& v) + { + return os << "log_message { .lvl=" << v.lvl << ", .msg=" << v.msg << " }"; + } +}; + +// Reduce duplication +struct fixture { + config cfg; + std::ostringstream oss{}; + std::vector msgs{}; + detail::connection_logger lgr{ + logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { + msgs.push_back({lvl, std::string(msg)}); + })}; + connect_fsm fsm{cfg, lgr}; + redis_stream_state st{}; +}; + +config make_ssl_config() +{ + config cfg; + cfg.use_ssl = true; + return cfg; +} + +config make_unix_config() +{ + config cfg; + cfg.unix_socket = "/run/redis.sock"; + return cfg; +} + +void test_tcp_success() +{ + // Setup + fixture fix; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::done); + + // The transport type was appropriately set + BOOST_TEST_EQ(fix.st.type, transport_type::tcp); + BOOST_TEST_NOT(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Connected to 192.168.10.1:1234" }, + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_tcp_tls_success() +{ + // Setup + fixture fix{make_ssl_config()}; + + // Run the algorithm. No SSL stream reset is performed here + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::done); + + // The transport type was appropriately set + BOOST_TEST_EQ(fix.st.type, transport_type::tcp_tls); + BOOST_TEST(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Connected to 192.168.10.1:1234" }, + {logger::level::info, "Successfully performed SSL handshake" }, + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_tcp_tls_success_reconnect() +{ + // Setup + fixture fix{make_ssl_config()}; + fix.st.ssl_stream_used = true; + + // Run the algorithm. The stream is used, so it needs to be reset + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_stream_reset); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::done); + + // The transport type was appropriately set + BOOST_TEST_EQ(fix.st.type, transport_type::tcp_tls); + BOOST_TEST(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Connected to 192.168.10.1:1234" }, + {logger::level::info, "Successfully performed SSL handshake" }, + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_unix_success() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::done); + + // The transport type was appropriately set + BOOST_TEST_EQ(fix.st.type, transport_type::unix_socket); + BOOST_TEST_NOT(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + {logger::level::info, "Connected to /run/redis.sock"}, + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +// Close errors are ignored +void test_unix_success_close_error() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(asio::error::bad_descriptor, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::done); + + // The transport type was appropriately set + BOOST_TEST_EQ(fix.st.type, transport_type::unix_socket); + BOOST_TEST_NOT(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + {logger::level::info, "Connected to /run/redis.sock"}, + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +// Resolve errors +void test_tcp_resolve_error() +{ + // Setup + fixture fix; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error::empty_field, resolver_results{}, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::empty_field)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Error resolving the server hostname: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_tcp_resolve_timeout() +{ + // Setup + fixture fix; + + // Since we use cancel_after, a timeout is an operation_aborted without a cancellation state set + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume( + asio::error::operation_aborted, + resolver_results{}, + fix.st, + cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::resolve_timeout)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Error resolving the server hostname: Resolve timeout. [boost.redis:17]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_tcp_resolve_cancel() +{ + // Setup + fixture fix; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume( + asio::error::operation_aborted, + resolver_results{}, + fix.st, + cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging here is system-dependent, so we don't check the message + BOOST_TEST_EQ(fix.msgs.size(), 1u); +} + +void test_tcp_resolve_cancel_edge() +{ + // Setup + fixture fix; + + // Cancel state set but no error + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_results{}, fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging here is system-dependent, so we don't check the message + BOOST_TEST_EQ(fix.msgs.size(), 1u); +} + +// Connect errors +void test_tcp_connect_error() +{ + // Setup + fixture fix; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error::empty_field, tcp::endpoint{}, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::empty_field)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Failed to connect to the server: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_tcp_connect_timeout() +{ + // Setup + fixture fix; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume( + asio::error::operation_aborted, + tcp::endpoint{}, + fix.st, + cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::connect_timeout)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Failed to connect to the server: Connect timeout. [boost.redis:18]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_tcp_connect_cancel() +{ + // Setup + fixture fix; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume( + asio::error::operation_aborted, + tcp::endpoint{}, + fix.st, + cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging here is system-dependent, so we don't check the message + BOOST_TEST_EQ(fix.msgs.size(), 2u); +} + +void test_tcp_connect_cancel_edge() +{ + // Setup + fixture fix; + + // Run the algorithm. Cancellation state set but no error + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), tcp::endpoint{}, fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging here is system-dependent, so we don't check the message + BOOST_TEST_EQ(fix.msgs.size(), 2u); +} + +// SSL handshake error +void test_ssl_handshake_error() +{ + // Setup + fixture fix{make_ssl_config()}; + + // Run the algorithm. No SSL stream reset is performed here + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(error::empty_field, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::empty_field)); + + // The stream is marked as used + BOOST_TEST(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Connected to 192.168.10.1:1234" }, + {logger::level::info, "Failed to perform SSL handshake: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_ssl_handshake_timeout() +{ + // Setup + fixture fix{make_ssl_config()}; + + // Run the algorithm. Timeout = operation_aborted without the cancel type set + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::ssl_handshake_timeout)); + + // The stream is marked as used + BOOST_TEST(fix.st.ssl_stream_used); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, + {logger::level::info, "Connected to 192.168.10.1:1234" }, + {logger::level::info, "Failed to perform SSL handshake: SSL handshake timeout. [boost.redis:20]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_ssl_handshake_cancel() +{ + // Setup + fixture fix{make_ssl_config()}; + + // Run the algorithm. Cancel = operation_aborted with the cancel type set + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // The stream is marked as used + BOOST_TEST(fix.st.ssl_stream_used); + + // Logging is system-dependent, so we don't check messages + BOOST_TEST_EQ(fix.msgs.size(), 3u); +} + +void test_ssl_handshake_cancel_edge() +{ + // Setup + fixture fix{make_ssl_config()}; + + // Run the algorithm. No error, but the cancel state is set + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); + act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::tcp_connect); + act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // The stream is marked as used + BOOST_TEST(fix.st.ssl_stream_used); + + // Logging is system-dependent, so we don't check messages + BOOST_TEST_EQ(fix.msgs.size(), 3u); +} + +// UNIX connect errors +void test_unix_connect_error() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(error::empty_field, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::empty_field)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Failed to connect to the server: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_unix_connect_timeout() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm. Timeout = operation_aborted without a cancel state + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::connect_timeout)); + + // Check logging + const log_message expected[] = { + // clang-format off + {logger::level::info, "Failed to connect to the server: Connect timeout. [boost.redis:18]"}, + // clang-format on + }; + BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); +} + +void test_unix_connect_cancel() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm. Cancel = operation_aborted with a cancel state + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging is system-dependent + BOOST_TEST_EQ(fix.msgs.size(), 1u); +} + +void test_unix_connect_cancel_edge() +{ + // Setup + fixture fix{make_unix_config()}; + + // Run the algorithm. No error, but cancel state is set + auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); + BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); + act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // Logging is system-dependent + BOOST_TEST_EQ(fix.msgs.size(), 1u); +} + +} // namespace + +int main() +{ + test_tcp_success(); + test_tcp_tls_success(); + test_tcp_tls_success_reconnect(); + test_unix_success(); + test_unix_success_close_error(); + + test_tcp_resolve_error(); + test_tcp_resolve_timeout(); + test_tcp_resolve_cancel(); + test_tcp_resolve_cancel_edge(); + + test_tcp_connect_error(); + test_tcp_connect_timeout(); + test_tcp_connect_cancel(); + test_tcp_connect_cancel_edge(); + + test_ssl_handshake_error(); + test_ssl_handshake_timeout(); + test_ssl_handshake_cancel(); + test_ssl_handshake_cancel_edge(); + + test_unix_connect_error(); + test_unix_connect_timeout(); + test_unix_connect_cancel(); + test_unix_connect_cancel_edge(); + + return boost::report_errors(); +}