From af97e51dbbf8ec2980f5ffa16a02fe78ef1065b4 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 12:29:51 +0200 Subject: [PATCH 01/11] Use connection_state --- include/boost/redis/detail/reader_fsm.hpp | 8 +++++--- include/boost/redis/impl/reader_fsm.ipp | 14 ++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index ed6b6ece..aa5af58d 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -6,6 +6,8 @@ #ifndef BOOST_REDIS_READER_FSM_HPP #define BOOST_REDIS_READER_FSM_HPP + +#include #include #include @@ -33,17 +35,17 @@ class reader_fsm { system::error_code ec_ = {}; }; - explicit reader_fsm(multiplexer& mpx) noexcept; - action resume( + connection_state& st, std::size_t bytes_read, system::error_code ec, asio::cancellation_type_t cancel_state); + reader_fsm() = default; + private: int resume_point_{0}; action::type next_read_type_ = action::type::read_some; - multiplexer* mpx_ = nullptr; std::pair res_{consume_result::needs_more, 0u}; }; diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 32894cad..2a751638 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -4,6 +4,7 @@ * accompanying file LICENSE.txt) */ +#include #include #include #include @@ -14,11 +15,8 @@ namespace boost::redis::detail { -reader_fsm::reader_fsm(multiplexer& mpx) noexcept -: mpx_{&mpx} -{ } - reader_fsm::action reader_fsm::resume( + connection_state& st, std::size_t bytes_read, system::error_code ec, asio::cancellation_type_t cancel_state) @@ -28,7 +26,7 @@ reader_fsm::action reader_fsm::resume( for (;;) { // Prepare the buffer for the read operation - ec = mpx_->prepare_read(); + ec = st.mpx.prepare_read(); if (ec) { return {action::type::done, 0, ec}; } @@ -37,7 +35,7 @@ reader_fsm::action reader_fsm::resume( BOOST_REDIS_YIELD(resume_point_, 1, next_read_type_) // Process the bytes read, even if there was an error - mpx_->commit_read(bytes_read); + st.mpx.commit_read(bytes_read); // Check for read errors if (ec) { @@ -54,8 +52,8 @@ reader_fsm::action reader_fsm::resume( // Process the data that we've read next_read_type_ = action::type::read_some; - while (mpx_->get_read_buffer_size() != 0) { - res_ = mpx_->consume(ec); + while (st.mpx.get_read_buffer_size() != 0) { + res_ = st.mpx.consume(ec); if (ec) { // TODO: Perhaps log what has not been consumed to aid From ca4ee46b7c7960c1eb2dc0aa4215241a2e00ff85 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 12:41:48 +0200 Subject: [PATCH 02/11] action constructors --- include/boost/redis/detail/reader_fsm.hpp | 21 ++++++++++++++++++--- include/boost/redis/impl/reader_fsm.ipp | 16 ++++++++-------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index aa5af58d..ef9b6534 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -30,9 +30,24 @@ class reader_fsm { done, }; - type type_ = type::done; - std::size_t push_size_ = 0u; - system::error_code ec_ = {}; + action(type t, std::size_t push_size = 0u) noexcept + : type_(t) + , push_size_(push_size) + { } + + action(system::error_code ec) noexcept + : type_(type::done) + , ec_(ec) + { } + + static action notify_push_receiver(std::size_t bytes) + { + return {type::notify_push_receiver, bytes}; + } + + type type_; + std::size_t push_size_{}; + system::error_code ec_; }; action resume( diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 2a751638..dff2800a 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -28,7 +28,7 @@ reader_fsm::action reader_fsm::resume( // Prepare the buffer for the read operation ec = st.mpx.prepare_read(); if (ec) { - return {action::type::done, 0, ec}; + return {ec}; } // Read @@ -42,12 +42,12 @@ reader_fsm::action reader_fsm::resume( // TODO: If an error occurred but data was read (i.e. // bytes_read != 0) we should try to process that data and // deliver it to the user before calling cancel_run. - return {action::type::done, bytes_read, ec}; + return {ec}; } // Check for cancellations if (is_terminal_cancel(cancel_state)) { - return {action::type::done, 0u, asio::error::operation_aborted}; + return {asio::error::operation_aborted}; } // Process the data that we've read @@ -58,7 +58,7 @@ reader_fsm::action reader_fsm::resume( if (ec) { // TODO: Perhaps log what has not been consumed to aid // debugging. - return {action::type::done, res_.second, ec}; + return {ec}; } if (res_.first == consume_result::needs_more) { @@ -67,12 +67,12 @@ reader_fsm::action reader_fsm::resume( } if (res_.first == consume_result::got_push) { - BOOST_REDIS_YIELD(resume_point_, 2, action::type::notify_push_receiver, res_.second) + BOOST_REDIS_YIELD(resume_point_, 2, action::notify_push_receiver(res_.second)) if (ec) { - return {action::type::done, 0u, ec}; + return {ec}; } if (is_terminal_cancel(cancel_state)) { - return {action::type::done, 0u, asio::error::operation_aborted}; + return {asio::error::operation_aborted}; } } else { // TODO: Here we should notify the exec operation that @@ -87,7 +87,7 @@ reader_fsm::action reader_fsm::resume( } BOOST_ASSERT(false); - return {action::type::done, 0, system::error_code()}; + return {system::error_code()}; } } // namespace boost::redis::detail From 918348e4374128ba88a8c73029e19b802d04f085 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 12:54:02 +0200 Subject: [PATCH 03/11] Move logging to the reader --- include/boost/redis/connection.hpp | 6 +--- .../boost/redis/detail/connection_logger.hpp | 3 +- include/boost/redis/detail/reader_fsm.hpp | 2 -- .../boost/redis/impl/connection_logger.ipp | 33 +++++-------------- include/boost/redis/impl/reader_fsm.ipp | 11 +++++-- 5 files changed, 19 insertions(+), 36 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 7a3054eb..fbda48ea 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -245,19 +245,15 @@ struct reader_op { public: reader_op(connection_impl& conn) noexcept : conn_{&conn} - , fsm_{conn.st_.mpx} { } template void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0) { for (;;) { - auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled()); - - conn_->st_.logger.on_fsm_resume(act); + auto act = fsm_.resume(conn_->st_, n, ec, self.get_cancellation_state().cancelled()); switch (act.type_) { - case reader_fsm::action::type::needs_more: case reader_fsm::action::type::read_some: { auto const buf = conn_->st_.mpx.get_prepared_read_buffer(); diff --git a/include/boost/redis/detail/connection_logger.hpp b/include/boost/redis/detail/connection_logger.hpp index 4ebfd9d5..2b2800e2 100644 --- a/include/boost/redis/detail/connection_logger.hpp +++ b/include/boost/redis/detail/connection_logger.hpp @@ -7,7 +7,6 @@ #ifndef BOOST_REDIS_CONNECTION_LOGGER_HPP #define BOOST_REDIS_CONNECTION_LOGGER_HPP -#include #include #include @@ -38,7 +37,7 @@ class connection_logger { void on_connect(system::error_code const& ec, std::string_view unix_socket_ep); void on_ssl_handshake(system::error_code const& ec); void on_write(system::error_code const& ec, std::size_t n); - void on_fsm_resume(reader_fsm::action const& action); + void on_read(system::error_code const& ec, std::size_t n); void on_setup(system::error_code const& ec, generic_response const& resp); void log(logger::level lvl, std::string_view msg); void log(logger::level lvl, std::string_view msg1, std::string_view msg2); diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index ef9b6534..e0d7de11 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -25,7 +25,6 @@ class reader_fsm { enum class type { read_some, - needs_more, notify_push_receiver, done, }; @@ -60,7 +59,6 @@ class reader_fsm { private: int resume_point_{0}; - action::type next_read_type_ = action::type::read_some; std::pair res_{consume_result::needs_more, 0u}; }; diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index 9940d528..1d4bb41f 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -16,23 +16,9 @@ namespace boost::redis::detail { -#define BOOST_REDIS_READER_SWITCH_CASE(elem) \ - case reader_fsm::action::type::elem: return "reader_fsm::action::type::" #elem - #define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \ case exec_action_type::elem: return "exec_action_type::" #elem -auto to_string(reader_fsm::action::type t) noexcept -> char const* -{ - switch (t) { - BOOST_REDIS_READER_SWITCH_CASE(read_some); - BOOST_REDIS_READER_SWITCH_CASE(needs_more); - BOOST_REDIS_READER_SWITCH_CASE(notify_push_receiver); - BOOST_REDIS_READER_SWITCH_CASE(done); - default: return "action::type::"; - } -} - auto to_string(exec_action_type t) noexcept -> char const* { switch (t) { @@ -161,21 +147,20 @@ void connection_logger::on_write(system::error_code const& ec, std::size_t n) logger_.fn(logger::level::info, msg_); } -void connection_logger::on_fsm_resume(reader_fsm::action const& action) +void connection_logger::on_read(system::error_code const& ec, std::size_t bytes_read) { if (logger_.lvl < logger::level::debug) return; - std::string msg; - msg += "("; - msg += to_string(action.type_); - msg += ", "; - msg += std::to_string(action.push_size_); - msg += ", "; - msg += action.ec_.message(); - msg += ")"; + msg_ = "Reader task: "; + msg_ += std::to_string(bytes_read); + msg_ += " bytes read."; + if (ec) { + msg_ += " Error: "; + format_error_code(ec, msg_); + } - logger_.fn(logger::level::debug, msg); + logger_.fn(logger::level::debug, msg_); } void connection_logger::on_setup(system::error_code const& ec, generic_response const& resp) diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index dff2800a..ccbdbec4 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -28,11 +28,14 @@ reader_fsm::action reader_fsm::resume( // Prepare the buffer for the read operation ec = st.mpx.prepare_read(); if (ec) { + st.logger.trace("Reader task: error in prepare_read", ec); return {ec}; } // Read - BOOST_REDIS_YIELD(resume_point_, 1, next_read_type_) + st.logger.trace("Reader task: issuing a read operation"); + BOOST_REDIS_YIELD(resume_point_, 1, action::type::read_some) + st.logger.on_read(ec, bytes_read); // Process the bytes read, even if there was an error st.mpx.commit_read(bytes_read); @@ -47,22 +50,23 @@ reader_fsm::action reader_fsm::resume( // Check for cancellations if (is_terminal_cancel(cancel_state)) { + st.logger.trace("Reader task: cancelled (1)"); return {asio::error::operation_aborted}; } // Process the data that we've read - next_read_type_ = action::type::read_some; while (st.mpx.get_read_buffer_size() != 0) { res_ = st.mpx.consume(ec); if (ec) { // TODO: Perhaps log what has not been consumed to aid // debugging. + st.logger.trace("Reader task: error while processing message", ec); return {ec}; } if (res_.first == consume_result::needs_more) { - next_read_type_ = action::type::needs_more; + st.logger.trace("Reader task: incomplete message received"); break; } @@ -72,6 +76,7 @@ reader_fsm::action reader_fsm::resume( return {ec}; } if (is_terminal_cancel(cancel_state)) { + st.logger.trace("Reader task: cancelled (2)"); return {asio::error::operation_aborted}; } } else { From 2bde20da2808f9ca3b79e92a14730673fb48da0a Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 13:12:33 +0200 Subject: [PATCH 04/11] Make reader_fsm test build again --- test/test_reader_fsm.cpp | 182 ++++++++++++++++++++------------------- 1 file changed, 95 insertions(+), 87 deletions(-) diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index af41a0a9..0a9ba8b8 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -5,6 +5,7 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // +#include #include #include @@ -12,6 +13,8 @@ #include #include +#include "sansio_utils.hpp" + #include namespace net = boost::asio; @@ -23,17 +26,37 @@ using redis::detail::multiplexer; using redis::generic_response; using redis::any_adapter; using redis::config; +using redis::detail::connection_state; using action = redis::detail::reader_fsm::action; // Operators +static const char* to_string(action::type type) +{ + switch (type) { + case action::type::read_some: return "action::type::read_some"; + case action::type::notify_push_receiver: return "action::type::notify_push_receiver"; + case action::type::done: return "action::type::done"; + default: return ""; + } +} + namespace boost::redis::detail { -extern auto to_string(reader_fsm::action::type t) noexcept -> char const*; +std::ostream& operator<<(std::ostream& os, action::type type) { return os << to_string(type); } + +bool operator==(const action& lhs, const action& rhs) noexcept +{ + return lhs.type_ == rhs.type_ && lhs.push_size_ == rhs.push_size_ && lhs.ec_ == rhs.ec_; +} -std::ostream& operator<<(std::ostream& os, reader_fsm::action::type t) +std::ostream& operator<<(std::ostream& os, const action& act) { - os << to_string(t); - return os; + os << "action{ .type=" << act.type_; + if (act.type_ == action::type::done) + os << ", .error=" << act.ec_; + else if (act.type_ == action::type::notify_push_receiver) + os << ", .push_size=" << act.push_size_; + return os << " }"; } } // namespace boost::redis::detail @@ -53,17 +76,21 @@ void copy_to(multiplexer& mpx, std::string_view data) std::copy(data.cbegin(), data.cend(), buffer.begin()); } +struct fixture : redis::detail::log_fixture { + connection_state st{make_logger()}; +}; + void test_push() { + fixture fix; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; + reader_fsm fsm; error_code ec; - action act; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -75,40 +102,33 @@ void test_push() copy_to(mpx, payload); // Deliver the 1st push - act = fsm.resume(payload.size(), ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.push_size_, 11u); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); // Deliver the 2st push - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.push_size_, 12u); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(12u)); // Deliver the 3rd push - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.push_size_, 13u); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(13u)); // All pushes were delivered so the fsm should demand more data - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); } void test_read_needs_more() { + fixture fix; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; + reader_fsm fsm; error_code ec; - action act; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // Split the incoming message in three random parts and deliver @@ -117,41 +137,36 @@ void test_read_needs_more() // Passes the first part to the fsm. copy_to(mpx, msg[0]); - act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::needs_more); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, msg[0].size(), ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // Passes the second part to the fsm. copy_to(mpx, msg[1]); - act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::needs_more); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, msg[1].size(), ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // Passes the third and last part to the fsm, next it should ask us // to deliver the message. copy_to(mpx, msg[2]); - act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size()); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, msg[2].size(), ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(msg[0].size() + msg[1].size() + msg[2].size())); // All pushes were delivered so the fsm should demand more data - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); } void test_read_error() { + fixture fix; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; + reader_fsm fsm; error_code ec; - action act; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -159,64 +174,65 @@ void test_read_error() copy_to(mpx, payload); // Deliver the data - act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); + act = fsm.resume( + fix.st, + payload.size(), + {net::error::operation_aborted}, + cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code{net::error::operation_aborted}); } void test_parse_error() { + fixture fix; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; + reader_fsm fsm; error_code ec; - action act; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); + auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // The fsm is asking for data. std::string const payload = ">a\r\n"; copy_to(mpx, payload); // Deliver the data - act = fsm.resume(payload.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.ec_, error_code{redis::error::not_a_number}); + act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code{redis::error::not_a_number}); } void test_push_deliver_error() { + fixture fix; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; + reader_fsm fsm; error_code ec; - action act; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); + auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // The fsm is asking for data. std::string const payload = ">1\r\n+msg1\r\n"; copy_to(mpx, payload); // Deliver the data - act = fsm.resume(payload.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(4u)); // Resumes from notifying a push with an error. - act = fsm.resume(0, net::error::operation_aborted, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); + act = fsm.resume(fix.st, 0, net::error::operation_aborted, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code{net::error::operation_aborted}); } void test_max_read_buffer_size() { + fixture fix; config cfg; cfg.read_buffer_append_size = 5; cfg.max_read_size = 7; @@ -225,57 +241,53 @@ void test_max_read_buffer_size() mpx.set_config(cfg); generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; + reader_fsm fsm; error_code ec; - action act; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); + auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // Passes the first part to the fsm. std::string const part1 = ">3\r\n"; copy_to(mpx, part1); - act = fsm.resume(part1.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size); + act = fsm.resume(fix.st, part1.size(), {}, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(redis::error::exceeds_maximum_read_buffer_size)); } // Cancellations void test_cancel_after_read() { + fixture fix; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; + reader_fsm fsm; error_code ec; - action act; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); + auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // Deliver a push, and notify a cancellation. // This can happen if the cancellation signal arrives before the read handler runs constexpr std::string_view payload = ">1\r\n+msg1\r\n"; copy_to(mpx, payload); - act = fsm.resume(payload.size(), ec, cancellation_type_t::terminal); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.push_size_, 0u); - BOOST_TEST_EQ(act.ec_, net::error::operation_aborted); + act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); } void test_cancel_after_push_delivery() { + fixture fix; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; + reader_fsm fsm; error_code ec; - action act; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -286,17 +298,13 @@ void test_cancel_after_push_delivery() copy_to(mpx, payload); // Deliver the 1st push - act = fsm.resume(payload.size(), ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.push_size_, 11u); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); // We got a cancellation after delivering it. // This can happen if the cancellation signal arrives before the channel send handler runs - act = fsm.resume(0, ec, cancellation_type_t::terminal); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.push_size_, 0u); - BOOST_TEST_EQ(act.ec_, net::error::operation_aborted); + act = fsm.resume(fix.st, 0, ec, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); } } // namespace From c27d4ea82c4d6af9854c4469a4b596862bc7e066 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 13:15:18 +0200 Subject: [PATCH 05/11] use the fixture --- test/test_reader_fsm.cpp | 52 +++++++++++++--------------------------- 1 file changed, 16 insertions(+), 36 deletions(-) diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 0a9ba8b8..00d2a90b 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -5,6 +5,7 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // +#include #include #include @@ -78,14 +79,14 @@ void copy_to(multiplexer& mpx, std::string_view data) struct fixture : redis::detail::log_fixture { connection_state st{make_logger()}; + generic_response resp; + + fixture() { st.mpx.set_receive_adapter(any_adapter{resp}); } }; void test_push() { fixture fix; - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm; error_code ec; @@ -99,7 +100,7 @@ void test_push() ">1\r\n+msg2 \r\n" ">1\r\n+msg3 \r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); // Deliver the 1st push act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::none); @@ -121,9 +122,6 @@ void test_push() void test_read_needs_more() { fixture fix; - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm; error_code ec; @@ -136,18 +134,18 @@ void test_read_needs_more() std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"}; // Passes the first part to the fsm. - copy_to(mpx, msg[0]); + copy_to(fix.st.mpx, msg[0]); act = fsm.resume(fix.st, msg[0].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); // Passes the second part to the fsm. - copy_to(mpx, msg[1]); + copy_to(fix.st.mpx, msg[1]); act = fsm.resume(fix.st, msg[1].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); // Passes the third and last part to the fsm, next it should ask us // to deliver the message. - copy_to(mpx, msg[2]); + copy_to(fix.st.mpx, msg[2]); act = fsm.resume(fix.st, msg[2].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act, action::notify_push_receiver(msg[0].size() + msg[1].size() + msg[2].size())); @@ -159,9 +157,6 @@ void test_read_needs_more() void test_read_error() { fixture fix; - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm; error_code ec; @@ -171,7 +166,7 @@ void test_read_error() // The fsm is asking for data. std::string const payload = ">1\r\n+msg1\r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); // Deliver the data act = fsm.resume( @@ -185,9 +180,6 @@ void test_read_error() void test_parse_error() { fixture fix; - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm; error_code ec; @@ -197,7 +189,7 @@ void test_parse_error() // The fsm is asking for data. std::string const payload = ">a\r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); // Deliver the data act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none); @@ -207,9 +199,6 @@ void test_parse_error() void test_push_deliver_error() { fixture fix; - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm; error_code ec; @@ -219,11 +208,11 @@ void test_push_deliver_error() // The fsm is asking for data. std::string const payload = ">1\r\n+msg1\r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); // Deliver the data act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act, action::notify_push_receiver(4u)); + BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); // Resumes from notifying a push with an error. act = fsm.resume(fix.st, 0, net::error::operation_aborted, cancellation_type_t::none); @@ -237,10 +226,7 @@ void test_max_read_buffer_size() cfg.read_buffer_append_size = 5; cfg.max_read_size = 7; - multiplexer mpx; - mpx.set_config(cfg); - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); + fix.st.mpx.set_config(cfg); reader_fsm fsm; error_code ec; @@ -250,7 +236,7 @@ void test_max_read_buffer_size() // Passes the first part to the fsm. std::string const part1 = ">3\r\n"; - copy_to(mpx, part1); + copy_to(fix.st.mpx, part1); act = fsm.resume(fix.st, part1.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(redis::error::exceeds_maximum_read_buffer_size)); } @@ -259,9 +245,6 @@ void test_max_read_buffer_size() void test_cancel_after_read() { fixture fix; - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm; error_code ec; @@ -272,7 +255,7 @@ void test_cancel_after_read() // Deliver a push, and notify a cancellation. // This can happen if the cancellation signal arrives before the read handler runs constexpr std::string_view payload = ">1\r\n+msg1\r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); } @@ -280,9 +263,6 @@ void test_cancel_after_read() void test_cancel_after_push_delivery() { fixture fix; - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm; error_code ec; @@ -295,7 +275,7 @@ void test_cancel_after_push_delivery() ">1\r\n+msg1\r\n" ">1\r\n+msg2 \r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); // Deliver the 1st push act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::none); From 219bbbcf1fc32022707af9de8a378be8fd29dd8b Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 13:33:44 +0200 Subject: [PATCH 06/11] Log tests --- .../boost/redis/impl/connection_logger.ipp | 4 +- include/boost/redis/impl/reader_fsm.ipp | 5 +- test/test_reader_fsm.cpp | 88 +++++++++++++++++-- 3 files changed, 85 insertions(+), 12 deletions(-) diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index 1d4bb41f..461b07fb 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -154,9 +154,9 @@ void connection_logger::on_read(system::error_code const& ec, std::size_t bytes_ msg_ = "Reader task: "; msg_ += std::to_string(bytes_read); - msg_ += " bytes read."; + msg_ += " bytes read"; if (ec) { - msg_ += " Error: "; + msg_ += ", error: "; format_error_code(ec, msg_); } diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index ccbdbec4..6ef84f3b 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -33,7 +33,7 @@ reader_fsm::action reader_fsm::resume( } // Read - st.logger.trace("Reader task: issuing a read operation"); + st.logger.trace("Reader task: issuing read"); BOOST_REDIS_YIELD(resume_point_, 1, action::type::read_some) st.logger.on_read(ec, bytes_read); @@ -61,7 +61,7 @@ reader_fsm::action reader_fsm::resume( if (ec) { // TODO: Perhaps log what has not been consumed to aid // debugging. - st.logger.trace("Reader task: error while processing message", ec); + st.logger.trace("Reader task: error processing message", ec); return {ec}; } @@ -73,6 +73,7 @@ reader_fsm::action reader_fsm::resume( if (res_.first == consume_result::got_push) { BOOST_REDIS_YIELD(resume_point_, 2, action::notify_push_receiver(res_.second)) if (ec) { + st.logger.trace("Reader task: error notifying push receiver", ec); return {ec}; } if (is_terminal_cancel(cancel_state)) { diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 00d2a90b..18bd0383 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include #include @@ -29,6 +31,7 @@ using redis::any_adapter; using redis::config; using redis::detail::connection_state; using action = redis::detail::reader_fsm::action; +using redis::logger; // Operators static const char* to_string(action::type type) @@ -117,6 +120,13 @@ void test_push() // All pushes were delivered so the fsm should demand more data act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 36 bytes read"}, + {logger::level::debug, "Reader task: issuing read" }, + }); } void test_read_needs_more() @@ -152,6 +162,19 @@ void test_read_needs_more() // All pushes were delivered so the fsm should demand more data act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 3 bytes read" }, + {logger::level::debug, "Reader task: incomplete message received"}, + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 11 bytes read" }, + {logger::level::debug, "Reader task: incomplete message received"}, + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 11 bytes read" }, + {logger::level::debug, "Reader task: issuing read" }, + }); } void test_read_error() @@ -169,12 +192,16 @@ void test_read_error() copy_to(fix.st.mpx, payload); // Deliver the data - act = fsm.resume( - fix.st, - payload.size(), - {net::error::operation_aborted}, - cancellation_type_t::none); - BOOST_TEST_EQ(act, error_code{net::error::operation_aborted}); + act = fsm.resume(fix.st, payload.size(), {redis::error::empty_field}, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code{redis::error::empty_field}); + + // Check logging + fix.check_log({ + // clang-format off + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 11 bytes read, error: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }); } void test_parse_error() @@ -194,6 +221,15 @@ void test_parse_error() // Deliver the data act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code{redis::error::not_a_number}); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read"}, + {logger::level::debug, "Reader task: 4 bytes read"}, + {logger::level::debug, + "Reader task: error processing message: Can't convert string to number (maybe forgot to " + "upgrade to RESP3?). [boost.redis:2]" }, + }); } void test_push_deliver_error() @@ -215,8 +251,17 @@ void test_push_deliver_error() BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); // Resumes from notifying a push with an error. - act = fsm.resume(fix.st, 0, net::error::operation_aborted, cancellation_type_t::none); - BOOST_TEST_EQ(act, error_code{net::error::operation_aborted}); + act = fsm.resume(fix.st, 0, redis::error::empty_field, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code{redis::error::empty_field}); + + // Check logging + fix.check_log({ + // clang-format off + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 11 bytes read" }, + {logger::level::debug, "Reader task: error notifying push receiver: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }); } void test_max_read_buffer_size() @@ -239,9 +284,20 @@ void test_max_read_buffer_size() copy_to(fix.st.mpx, part1); act = fsm.resume(fix.st, part1.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(redis::error::exceeds_maximum_read_buffer_size)); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 4 bytes read" }, + {logger::level::debug, "Reader task: incomplete message received"}, + {logger::level::debug, + "Reader task: error in prepare_read: Reading data from the socket would exceed the maximum " + "size allowed of the read buffer. [boost.redis:26]" }, + }); } // Cancellations +// TODO: cancel with error void test_cancel_after_read() { fixture fix; @@ -258,6 +314,13 @@ void test_cancel_after_read() copy_to(fix.st.mpx, payload); act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 11 bytes read"}, + {logger::level::debug, "Reader task: cancelled (1)"}, + }); } void test_cancel_after_push_delivery() @@ -285,8 +348,17 @@ void test_cancel_after_push_delivery() // This can happen if the cancellation signal arrives before the channel send handler runs act = fsm.resume(fix.st, 0, ec, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 23 bytes read"}, + {logger::level::debug, "Reader task: cancelled (2)"}, + }); } +// TODO: cancel edge + } // namespace int main() From 8f8641c1df20ddefd9a8da56df388d943a10dec8 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 13:37:06 +0200 Subject: [PATCH 07/11] test cancel read with error --- test/test_reader_fsm.cpp | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 18bd0383..ad00b9cc 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -297,8 +297,29 @@ void test_max_read_buffer_size() } // Cancellations -// TODO: cancel with error -void test_cancel_after_read() +void test_cancel_read() +{ + fixture fix; + reader_fsm fsm; + + // Initiate + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); + + // The read was cancelled (maybe after delivering some bytes) + constexpr std::string_view payload = ">1\r\n"; + copy_to(fix.st.mpx, payload); + act = fsm.resume( + fix.st, + payload.size(), + net::error::operation_aborted, + cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + + // Log contains a system-dependent error, so we don't check it here +} + +void test_cancel_read_edge() { fixture fix; reader_fsm fsm; @@ -323,7 +344,7 @@ void test_cancel_after_read() }); } -void test_cancel_after_push_delivery() +void test_cancel_push_delivery() { fixture fix; reader_fsm fsm; @@ -371,8 +392,9 @@ int main() test_push_deliver_error(); test_max_read_buffer_size(); - test_cancel_after_read(); - test_cancel_after_push_delivery(); + test_cancel_read(); + test_cancel_read_edge(); + test_cancel_push_delivery(); return boost::report_errors(); } From 6f792795847be327bb30d591bd7e4e66eb752faa Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 13:42:12 +0200 Subject: [PATCH 08/11] Better cancellation logging --- include/boost/redis/impl/reader_fsm.ipp | 25 ++++++++------ test/test_reader_fsm.cpp | 43 ++++++++++++++++++++++--- 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 6ef84f3b..e21fa144 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -35,6 +35,14 @@ reader_fsm::action reader_fsm::resume( // Read st.logger.trace("Reader task: issuing read"); BOOST_REDIS_YIELD(resume_point_, 1, action::type::read_some) + + // Check for cancellations + if (is_terminal_cancel(cancel_state)) { + st.logger.trace("Reader task: cancelled (1)"); + return {asio::error::operation_aborted}; + } + + // Log what we read st.logger.on_read(ec, bytes_read); // Process the bytes read, even if there was an error @@ -48,12 +56,6 @@ reader_fsm::action reader_fsm::resume( return {ec}; } - // Check for cancellations - if (is_terminal_cancel(cancel_state)) { - st.logger.trace("Reader task: cancelled (1)"); - return {asio::error::operation_aborted}; - } - // Process the data that we've read while (st.mpx.get_read_buffer_size() != 0) { res_ = st.mpx.consume(ec); @@ -72,14 +74,17 @@ reader_fsm::action reader_fsm::resume( if (res_.first == consume_result::got_push) { BOOST_REDIS_YIELD(resume_point_, 2, action::notify_push_receiver(res_.second)) - if (ec) { - st.logger.trace("Reader task: error notifying push receiver", ec); - return {ec}; - } + // Check for cancellations if (is_terminal_cancel(cancel_state)) { st.logger.trace("Reader task: cancelled (2)"); return {asio::error::operation_aborted}; } + + // Check for other errors + if (ec) { + st.logger.trace("Reader task: error notifying push receiver", ec); + return {ec}; + } } else { // TODO: Here we should notify the exec operation that // it can be completed. This will improve log clarity diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index ad00b9cc..10a182d2 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -316,7 +316,11 @@ void test_cancel_read() cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); - // Log contains a system-dependent error, so we don't check it here + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: cancelled (1)"}, + }); } void test_cancel_read_edge() @@ -339,7 +343,6 @@ void test_cancel_read_edge() // Check logging fix.check_log({ {logger::level::debug, "Reader task: issuing read" }, - {logger::level::debug, "Reader task: 11 bytes read"}, {logger::level::debug, "Reader task: cancelled (1)"}, }); } @@ -365,6 +368,39 @@ void test_cancel_push_delivery() act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); + // We got a cancellation while delivering it + act = fsm.resume(fix.st, 0, net::error::operation_aborted, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 23 bytes read"}, + {logger::level::debug, "Reader task: cancelled (2)"}, + }); +} + +void test_cancel_push_delivery_edge() +{ + fixture fix; + reader_fsm fsm; + error_code ec; + + // Initiate + auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::read_some); + + // The fsm is asking for data. + constexpr std::string_view payload = + ">1\r\n+msg1\r\n" + ">1\r\n+msg2 \r\n"; + + copy_to(fix.st.mpx, payload); + + // Deliver the 1st push + act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); + // We got a cancellation after delivering it. // This can happen if the cancellation signal arrives before the channel send handler runs act = fsm.resume(fix.st, 0, ec, cancellation_type_t::terminal); @@ -378,8 +414,6 @@ void test_cancel_push_delivery() }); } -// TODO: cancel edge - } // namespace int main() @@ -395,6 +429,7 @@ int main() test_cancel_read(); test_cancel_read_edge(); test_cancel_push_delivery(); + test_cancel_push_delivery_edge(); return boost::report_errors(); } From 4ddfdeb7d0809ccc8ee4377411e8047d173536e5 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 13:43:50 +0200 Subject: [PATCH 09/11] explicit error codes --- test/test_reader_fsm.cpp | 53 +++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 10a182d2..ec81a284 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -91,10 +91,9 @@ void test_push() { fixture fix; reader_fsm fsm; - error_code ec; // Initiate - auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -106,19 +105,19 @@ void test_push() copy_to(fix.st.mpx, payload); // Deliver the 1st push - act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::none); + act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); // Deliver the 2st push - act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::notify_push_receiver(12u)); // Deliver the 3rd push - act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::notify_push_receiver(13u)); // All pushes were delivered so the fsm should demand more data - act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); // Check logging @@ -133,10 +132,9 @@ void test_read_needs_more() { fixture fix; reader_fsm fsm; - error_code ec; // Initiate - auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // Split the incoming message in three random parts and deliver @@ -145,22 +143,22 @@ void test_read_needs_more() // Passes the first part to the fsm. copy_to(fix.st.mpx, msg[0]); - act = fsm.resume(fix.st, msg[0].size(), ec, cancellation_type_t::none); + act = fsm.resume(fix.st, msg[0].size(), error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); // Passes the second part to the fsm. copy_to(fix.st.mpx, msg[1]); - act = fsm.resume(fix.st, msg[1].size(), ec, cancellation_type_t::none); + act = fsm.resume(fix.st, msg[1].size(), error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); // Passes the third and last part to the fsm, next it should ask us // to deliver the message. copy_to(fix.st.mpx, msg[2]); - act = fsm.resume(fix.st, msg[2].size(), ec, cancellation_type_t::none); + act = fsm.resume(fix.st, msg[2].size(), error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::notify_push_receiver(msg[0].size() + msg[1].size() + msg[2].size())); // All pushes were delivered so the fsm should demand more data - act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); // Check logging @@ -181,10 +179,9 @@ void test_read_error() { fixture fix; reader_fsm fsm; - error_code ec; // Initiate - auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -208,10 +205,9 @@ void test_parse_error() { fixture fix; reader_fsm fsm; - error_code ec; // Initiate - auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); // The fsm is asking for data. @@ -236,10 +232,9 @@ void test_push_deliver_error() { fixture fix; reader_fsm fsm; - error_code ec; // Initiate - auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); // The fsm is asking for data. @@ -273,16 +268,15 @@ void test_max_read_buffer_size() fix.st.mpx.set_config(cfg); reader_fsm fsm; - error_code ec; // Initiate - auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); // Passes the first part to the fsm. std::string const part1 = ">3\r\n"; copy_to(fix.st.mpx, part1); - act = fsm.resume(fix.st, part1.size(), {}, cancellation_type_t::none); + act = fsm.resume(fix.st, part1.size(), error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(redis::error::exceeds_maximum_read_buffer_size)); // Check logging @@ -327,17 +321,16 @@ void test_cancel_read_edge() { fixture fix; reader_fsm fsm; - error_code ec; // Initiate - auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::type::read_some); // Deliver a push, and notify a cancellation. // This can happen if the cancellation signal arrives before the read handler runs constexpr std::string_view payload = ">1\r\n+msg1\r\n"; copy_to(fix.st.mpx, payload); - act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::terminal); + act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); // Check logging @@ -351,10 +344,9 @@ void test_cancel_push_delivery() { fixture fix; reader_fsm fsm; - error_code ec; // Initiate - auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -365,7 +357,7 @@ void test_cancel_push_delivery() copy_to(fix.st.mpx, payload); // Deliver the 1st push - act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::none); + act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); // We got a cancellation while delivering it @@ -384,10 +376,9 @@ void test_cancel_push_delivery_edge() { fixture fix; reader_fsm fsm; - error_code ec; // Initiate - auto act = fsm.resume(fix.st, 0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -398,12 +389,12 @@ void test_cancel_push_delivery_edge() copy_to(fix.st.mpx, payload); // Deliver the 1st push - act = fsm.resume(fix.st, payload.size(), ec, cancellation_type_t::none); + act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); // We got a cancellation after delivering it. // This can happen if the cancellation signal arrives before the channel send handler runs - act = fsm.resume(fix.st, 0, ec, cancellation_type_t::terminal); + act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); // Check logging From 7012cc230e6563f57cd1228c8bc5fa4ac3b0a2cc Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 13:44:27 +0200 Subject: [PATCH 10/11] small rework --- test/test_reader_fsm.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index ec81a284..1c204131 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -262,11 +262,9 @@ void test_push_deliver_error() void test_max_read_buffer_size() { fixture fix; - config cfg; - cfg.read_buffer_append_size = 5; - cfg.max_read_size = 7; - - fix.st.mpx.set_config(cfg); + fix.st.cfg.read_buffer_append_size = 5; + fix.st.cfg.max_read_size = 7; + fix.st.mpx.set_config(fix.st.cfg); reader_fsm fsm; // Initiate From e4240a2ba86de59ae4a78612337e993aacf9d36e Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 15 Oct 2025 14:02:16 +0200 Subject: [PATCH 11/11] Remove testing code from connection_logger.ipp --- .../boost/redis/impl/connection_logger.ipp | 16 ------------- test/test_exec_fsm.cpp | 23 +++++++++++++++---- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index 461b07fb..dc5fa3e5 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -5,7 +5,6 @@ */ #include -#include #include #include @@ -16,21 +15,6 @@ namespace boost::redis::detail { -#define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \ - case exec_action_type::elem: return "exec_action_type::" #elem - -auto to_string(exec_action_type t) noexcept -> char const* -{ - switch (t) { - BOOST_REDIS_EXEC_SWITCH_CASE(setup_cancellation); - BOOST_REDIS_EXEC_SWITCH_CASE(immediate); - BOOST_REDIS_EXEC_SWITCH_CASE(done); - BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer); - BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response); - default: return "exec_action_type::"; - } -} - inline void format_tcp_endpoint(const asio::ip::tcp::endpoint& ep, std::string& to) { // This formatting is inspired by Asio's endpoint operator<< diff --git a/test/test_exec_fsm.cpp b/test/test_exec_fsm.cpp index 1589bb3a..ada451b6 100644 --- a/test/test_exec_fsm.cpp +++ b/test/test_exec_fsm.cpp @@ -12,17 +12,17 @@ #include #include +#include #include #include -#include + +#include "sansio_utils.hpp" #include #include #include #include -#include "sansio_utils.hpp" - using namespace boost::redis; namespace asio = boost::asio; using detail::exec_fsm; @@ -33,11 +33,24 @@ using detail::exec_action; using boost::system::error_code; using boost::asio::cancellation_type_t; +#define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \ + case exec_action_type::elem: return "exec_action_type::" #elem + +static auto to_string(exec_action_type t) noexcept -> char const* +{ + switch (t) { + BOOST_REDIS_EXEC_SWITCH_CASE(setup_cancellation); + BOOST_REDIS_EXEC_SWITCH_CASE(immediate); + BOOST_REDIS_EXEC_SWITCH_CASE(done); + BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer); + BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response); + default: return "exec_action_type::"; + } +} + // Operators namespace boost::redis::detail { -extern auto to_string(exec_action_type t) noexcept -> char const*; - std::ostream& operator<<(std::ostream& os, exec_action_type type) { os << to_string(type);