diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 7a3054eb..fbda48ea 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -245,19 +245,15 @@ struct reader_op { public: reader_op(connection_impl& conn) noexcept : conn_{&conn} - , fsm_{conn.st_.mpx} { } template void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0) { for (;;) { - auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled()); - - conn_->st_.logger.on_fsm_resume(act); + auto act = fsm_.resume(conn_->st_, n, ec, self.get_cancellation_state().cancelled()); switch (act.type_) { - case reader_fsm::action::type::needs_more: case reader_fsm::action::type::read_some: { auto const buf = conn_->st_.mpx.get_prepared_read_buffer(); diff --git a/include/boost/redis/detail/connection_logger.hpp b/include/boost/redis/detail/connection_logger.hpp index 4ebfd9d5..2b2800e2 100644 --- a/include/boost/redis/detail/connection_logger.hpp +++ b/include/boost/redis/detail/connection_logger.hpp @@ -7,7 +7,6 @@ #ifndef BOOST_REDIS_CONNECTION_LOGGER_HPP #define BOOST_REDIS_CONNECTION_LOGGER_HPP -#include #include #include @@ -38,7 +37,7 @@ class connection_logger { void on_connect(system::error_code const& ec, std::string_view unix_socket_ep); void on_ssl_handshake(system::error_code const& ec); void on_write(system::error_code const& ec, std::size_t n); - void on_fsm_resume(reader_fsm::action const& action); + void on_read(system::error_code const& ec, std::size_t n); void on_setup(system::error_code const& ec, generic_response const& resp); void log(logger::level lvl, std::string_view msg); void log(logger::level lvl, std::string_view msg1, std::string_view msg2); diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index ed6b6ece..e0d7de11 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -6,6 +6,8 @@ #ifndef BOOST_REDIS_READER_FSM_HPP #define BOOST_REDIS_READER_FSM_HPP + +#include #include #include @@ -23,27 +25,40 @@ class reader_fsm { enum class type { read_some, - needs_more, notify_push_receiver, done, }; - type type_ = type::done; - std::size_t push_size_ = 0u; - system::error_code ec_ = {}; - }; + action(type t, std::size_t push_size = 0u) noexcept + : type_(t) + , push_size_(push_size) + { } - explicit reader_fsm(multiplexer& mpx) noexcept; + action(system::error_code ec) noexcept + : type_(type::done) + , ec_(ec) + { } + + static action notify_push_receiver(std::size_t bytes) + { + return {type::notify_push_receiver, bytes}; + } + + type type_; + std::size_t push_size_{}; + system::error_code ec_; + }; action resume( + connection_state& st, std::size_t bytes_read, system::error_code ec, asio::cancellation_type_t cancel_state); + reader_fsm() = default; + private: int resume_point_{0}; - action::type next_read_type_ = action::type::read_some; - multiplexer* mpx_ = nullptr; std::pair res_{consume_result::needs_more, 0u}; }; diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index 9940d528..dc5fa3e5 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -5,7 +5,6 @@ */ #include -#include #include #include @@ -16,35 +15,6 @@ namespace boost::redis::detail { -#define BOOST_REDIS_READER_SWITCH_CASE(elem) \ - case reader_fsm::action::type::elem: return "reader_fsm::action::type::" #elem - -#define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \ - case exec_action_type::elem: return "exec_action_type::" #elem - -auto to_string(reader_fsm::action::type t) noexcept -> char const* -{ - switch (t) { - BOOST_REDIS_READER_SWITCH_CASE(read_some); - BOOST_REDIS_READER_SWITCH_CASE(needs_more); - BOOST_REDIS_READER_SWITCH_CASE(notify_push_receiver); - BOOST_REDIS_READER_SWITCH_CASE(done); - default: return "action::type::"; - } -} - -auto to_string(exec_action_type t) noexcept -> char const* -{ - switch (t) { - BOOST_REDIS_EXEC_SWITCH_CASE(setup_cancellation); - BOOST_REDIS_EXEC_SWITCH_CASE(immediate); - BOOST_REDIS_EXEC_SWITCH_CASE(done); - BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer); - BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response); - default: return "exec_action_type::"; - } -} - inline void format_tcp_endpoint(const asio::ip::tcp::endpoint& ep, std::string& to) { // This formatting is inspired by Asio's endpoint operator<< @@ -161,21 +131,20 @@ void connection_logger::on_write(system::error_code const& ec, std::size_t n) logger_.fn(logger::level::info, msg_); } -void connection_logger::on_fsm_resume(reader_fsm::action const& action) +void connection_logger::on_read(system::error_code const& ec, std::size_t bytes_read) { if (logger_.lvl < logger::level::debug) return; - std::string msg; - msg += "("; - msg += to_string(action.type_); - msg += ", "; - msg += std::to_string(action.push_size_); - msg += ", "; - msg += action.ec_.message(); - msg += ")"; + msg_ = "Reader task: "; + msg_ += std::to_string(bytes_read); + msg_ += " bytes read"; + if (ec) { + msg_ += ", error: "; + format_error_code(ec, msg_); + } - logger_.fn(logger::level::debug, msg); + logger_.fn(logger::level::debug, msg_); } void connection_logger::on_setup(system::error_code const& ec, generic_response const& resp) diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 32894cad..e21fa144 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -4,6 +4,7 @@ * accompanying file LICENSE.txt) */ +#include #include #include #include @@ -14,11 +15,8 @@ namespace boost::redis::detail { -reader_fsm::reader_fsm(multiplexer& mpx) noexcept -: mpx_{&mpx} -{ } - reader_fsm::action reader_fsm::resume( + connection_state& st, std::size_t bytes_read, system::error_code ec, asio::cancellation_type_t cancel_state) @@ -28,53 +26,64 @@ reader_fsm::action reader_fsm::resume( for (;;) { // Prepare the buffer for the read operation - ec = mpx_->prepare_read(); + ec = st.mpx.prepare_read(); if (ec) { - return {action::type::done, 0, ec}; + st.logger.trace("Reader task: error in prepare_read", ec); + return {ec}; } // Read - BOOST_REDIS_YIELD(resume_point_, 1, next_read_type_) + st.logger.trace("Reader task: issuing read"); + BOOST_REDIS_YIELD(resume_point_, 1, action::type::read_some) + + // Check for cancellations + if (is_terminal_cancel(cancel_state)) { + st.logger.trace("Reader task: cancelled (1)"); + return {asio::error::operation_aborted}; + } + + // Log what we read + st.logger.on_read(ec, bytes_read); // Process the bytes read, even if there was an error - mpx_->commit_read(bytes_read); + st.mpx.commit_read(bytes_read); // Check for read errors if (ec) { // TODO: If an error occurred but data was read (i.e. // bytes_read != 0) we should try to process that data and // deliver it to the user before calling cancel_run. - return {action::type::done, bytes_read, ec}; - } - - // Check for cancellations - if (is_terminal_cancel(cancel_state)) { - return {action::type::done, 0u, asio::error::operation_aborted}; + return {ec}; } // Process the data that we've read - next_read_type_ = action::type::read_some; - while (mpx_->get_read_buffer_size() != 0) { - res_ = mpx_->consume(ec); + while (st.mpx.get_read_buffer_size() != 0) { + res_ = st.mpx.consume(ec); if (ec) { // TODO: Perhaps log what has not been consumed to aid // debugging. - return {action::type::done, res_.second, ec}; + st.logger.trace("Reader task: error processing message", ec); + return {ec}; } if (res_.first == consume_result::needs_more) { - next_read_type_ = action::type::needs_more; + st.logger.trace("Reader task: incomplete message received"); break; } if (res_.first == consume_result::got_push) { - BOOST_REDIS_YIELD(resume_point_, 2, action::type::notify_push_receiver, res_.second) - if (ec) { - return {action::type::done, 0u, ec}; - } + BOOST_REDIS_YIELD(resume_point_, 2, action::notify_push_receiver(res_.second)) + // Check for cancellations if (is_terminal_cancel(cancel_state)) { - return {action::type::done, 0u, asio::error::operation_aborted}; + st.logger.trace("Reader task: cancelled (2)"); + return {asio::error::operation_aborted}; + } + + // Check for other errors + if (ec) { + st.logger.trace("Reader task: error notifying push receiver", ec); + return {ec}; } } else { // TODO: Here we should notify the exec operation that @@ -89,7 +98,7 @@ reader_fsm::action reader_fsm::resume( } BOOST_ASSERT(false); - return {action::type::done, 0, system::error_code()}; + return {system::error_code()}; } } // namespace boost::redis::detail diff --git a/test/test_exec_fsm.cpp b/test/test_exec_fsm.cpp index 1589bb3a..ada451b6 100644 --- a/test/test_exec_fsm.cpp +++ b/test/test_exec_fsm.cpp @@ -12,17 +12,17 @@ #include #include +#include #include #include -#include + +#include "sansio_utils.hpp" #include #include #include #include -#include "sansio_utils.hpp" - using namespace boost::redis; namespace asio = boost::asio; using detail::exec_fsm; @@ -33,11 +33,24 @@ using detail::exec_action; using boost::system::error_code; using boost::asio::cancellation_type_t; +#define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \ + case exec_action_type::elem: return "exec_action_type::" #elem + +static auto to_string(exec_action_type t) noexcept -> char const* +{ + switch (t) { + BOOST_REDIS_EXEC_SWITCH_CASE(setup_cancellation); + BOOST_REDIS_EXEC_SWITCH_CASE(immediate); + BOOST_REDIS_EXEC_SWITCH_CASE(done); + BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer); + BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response); + default: return "exec_action_type::"; + } +} + // Operators namespace boost::redis::detail { -extern auto to_string(exec_action_type t) noexcept -> char const*; - std::ostream& operator<<(std::ostream& os, exec_action_type type) { os << to_string(type); diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index af41a0a9..1c204131 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -5,13 +5,19 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // +#include +#include #include +#include +#include #include #include #include #include +#include "sansio_utils.hpp" + #include namespace net = boost::asio; @@ -23,17 +29,38 @@ using redis::detail::multiplexer; using redis::generic_response; using redis::any_adapter; using redis::config; +using redis::detail::connection_state; using action = redis::detail::reader_fsm::action; +using redis::logger; // Operators +static const char* to_string(action::type type) +{ + switch (type) { + case action::type::read_some: return "action::type::read_some"; + case action::type::notify_push_receiver: return "action::type::notify_push_receiver"; + case action::type::done: return "action::type::done"; + default: return ""; + } +} + namespace boost::redis::detail { -extern auto to_string(reader_fsm::action::type t) noexcept -> char const*; +std::ostream& operator<<(std::ostream& os, action::type type) { return os << to_string(type); } -std::ostream& operator<<(std::ostream& os, reader_fsm::action::type t) +bool operator==(const action& lhs, const action& rhs) noexcept { - os << to_string(t); - return os; + return lhs.type_ == rhs.type_ && lhs.push_size_ == rhs.push_size_ && lhs.ec_ == rhs.ec_; +} + +std::ostream& operator<<(std::ostream& os, const action& act) +{ + os << "action{ .type=" << act.type_; + if (act.type_ == action::type::done) + os << ", .error=" << act.ec_; + else if (act.type_ == action::type::notify_push_receiver) + os << ", .push_size=" << act.push_size_; + return os << " }"; } } // namespace boost::redis::detail @@ -53,17 +80,20 @@ void copy_to(multiplexer& mpx, std::string_view data) std::copy(data.cbegin(), data.cend(), buffer.begin()); } +struct fixture : redis::detail::log_fixture { + connection_state st{make_logger()}; + generic_response resp; + + fixture() { st.mpx.set_receive_adapter(any_adapter{resp}); } +}; + void test_push() { - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; - error_code ec; - action act; + fixture fix; + reader_fsm fsm; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -72,43 +102,39 @@ void test_push() ">1\r\n+msg2 \r\n" ">1\r\n+msg3 \r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); // Deliver the 1st push - act = fsm.resume(payload.size(), ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.push_size_, 11u); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); // Deliver the 2st push - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.push_size_, 12u); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(12u)); // Deliver the 3rd push - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.push_size_, 13u); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(13u)); // All pushes were delivered so the fsm should demand more data - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 36 bytes read"}, + {logger::level::debug, "Reader task: issuing read" }, + }); } void test_read_needs_more() { - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; - error_code ec; - action act; + fixture fix; + reader_fsm fsm; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // Split the incoming message in three random parts and deliver @@ -116,166 +142,241 @@ void test_read_needs_more() std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"}; // Passes the first part to the fsm. - copy_to(mpx, msg[0]); - act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::needs_more); - BOOST_TEST_EQ(act.ec_, error_code()); + copy_to(fix.st.mpx, msg[0]); + act = fsm.resume(fix.st, msg[0].size(), error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // Passes the second part to the fsm. - copy_to(mpx, msg[1]); - act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::needs_more); - BOOST_TEST_EQ(act.ec_, error_code()); + copy_to(fix.st.mpx, msg[1]); + act = fsm.resume(fix.st, msg[1].size(), error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // Passes the third and last part to the fsm, next it should ask us // to deliver the message. - copy_to(mpx, msg[2]); - act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size()); - BOOST_TEST_EQ(act.ec_, error_code()); + copy_to(fix.st.mpx, msg[2]); + act = fsm.resume(fix.st, msg[2].size(), error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(msg[0].size() + msg[1].size() + msg[2].size())); // All pushes were delivered so the fsm should demand more data - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 3 bytes read" }, + {logger::level::debug, "Reader task: incomplete message received"}, + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 11 bytes read" }, + {logger::level::debug, "Reader task: incomplete message received"}, + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 11 bytes read" }, + {logger::level::debug, "Reader task: issuing read" }, + }); } void test_read_error() { - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; - error_code ec; - action act; + fixture fix; + reader_fsm fsm; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. std::string const payload = ">1\r\n+msg1\r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); // Deliver the data - act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); + act = fsm.resume(fix.st, payload.size(), {redis::error::empty_field}, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code{redis::error::empty_field}); + + // Check logging + fix.check_log({ + // clang-format off + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 11 bytes read, error: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }); } void test_parse_error() { - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; - error_code ec; - action act; + fixture fix; + reader_fsm fsm; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // The fsm is asking for data. std::string const payload = ">a\r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); // Deliver the data - act = fsm.resume(payload.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.ec_, error_code{redis::error::not_a_number}); + act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code{redis::error::not_a_number}); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read"}, + {logger::level::debug, "Reader task: 4 bytes read"}, + {logger::level::debug, + "Reader task: error processing message: Can't convert string to number (maybe forgot to " + "upgrade to RESP3?). [boost.redis:2]" }, + }); } void test_push_deliver_error() { - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; - error_code ec; - action act; + fixture fix; + reader_fsm fsm; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // The fsm is asking for data. std::string const payload = ">1\r\n+msg1\r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); // Deliver the data - act = fsm.resume(payload.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); // Resumes from notifying a push with an error. - act = fsm.resume(0, net::error::operation_aborted, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); + act = fsm.resume(fix.st, 0, redis::error::empty_field, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code{redis::error::empty_field}); + + // Check logging + fix.check_log({ + // clang-format off + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 11 bytes read" }, + {logger::level::debug, "Reader task: error notifying push receiver: Expected field value is empty. [boost.redis:5]"}, + // clang-format on + }); } void test_max_read_buffer_size() { - config cfg; - cfg.read_buffer_append_size = 5; - cfg.max_read_size = 7; - - multiplexer mpx; - mpx.set_config(cfg); - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; - error_code ec; - action act; + fixture fix; + fix.st.cfg.read_buffer_append_size = 5; + fix.st.cfg.max_read_size = 7; + fix.st.mpx.set_config(fix.st.cfg); + reader_fsm fsm; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // Passes the first part to the fsm. std::string const part1 = ">3\r\n"; - copy_to(mpx, part1); - act = fsm.resume(part1.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size); + copy_to(fix.st.mpx, part1); + act = fsm.resume(fix.st, part1.size(), error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(redis::error::exceeds_maximum_read_buffer_size)); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 4 bytes read" }, + {logger::level::debug, "Reader task: incomplete message received"}, + {logger::level::debug, + "Reader task: error in prepare_read: Reading data from the socket would exceed the maximum " + "size allowed of the read buffer. [boost.redis:26]" }, + }); } // Cancellations -void test_cancel_after_read() +void test_cancel_read() { - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; - error_code ec; - action act; + fixture fix; + reader_fsm fsm; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::read_some); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); + + // The read was cancelled (maybe after delivering some bytes) + constexpr std::string_view payload = ">1\r\n"; + copy_to(fix.st.mpx, payload); + act = fsm.resume( + fix.st, + payload.size(), + net::error::operation_aborted, + cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: cancelled (1)"}, + }); +} + +void test_cancel_read_edge() +{ + fixture fix; + reader_fsm fsm; + + // Initiate + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::type::read_some); // Deliver a push, and notify a cancellation. // This can happen if the cancellation signal arrives before the read handler runs constexpr std::string_view payload = ">1\r\n+msg1\r\n"; - copy_to(mpx, payload); - act = fsm.resume(payload.size(), ec, cancellation_type_t::terminal); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.push_size_, 0u); - BOOST_TEST_EQ(act.ec_, net::error::operation_aborted); + copy_to(fix.st.mpx, payload); + act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: cancelled (1)"}, + }); } -void test_cancel_after_push_delivery() +void test_cancel_push_delivery() { - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); - reader_fsm fsm{mpx}; - error_code ec; - action act; + fixture fix; + reader_fsm fsm; + + // Initiate + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::read_some); + + // The fsm is asking for data. + constexpr std::string_view payload = + ">1\r\n+msg1\r\n" + ">1\r\n+msg2 \r\n"; + + copy_to(fix.st.mpx, payload); + + // Deliver the 1st push + act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); + + // We got a cancellation while delivering it + act = fsm.resume(fix.st, 0, net::error::operation_aborted, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 23 bytes read"}, + {logger::level::debug, "Reader task: cancelled (2)"}, + }); +} + +void test_cancel_push_delivery_edge() +{ + fixture fix; + reader_fsm fsm; // Initiate - act = fsm.resume(0, ec, cancellation_type_t::none); + auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -283,20 +384,23 @@ void test_cancel_after_push_delivery() ">1\r\n+msg1\r\n" ">1\r\n+msg2 \r\n"; - copy_to(mpx, payload); + copy_to(fix.st.mpx, payload); // Deliver the 1st push - act = fsm.resume(payload.size(), ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); - BOOST_TEST_EQ(act.push_size_, 11u); - BOOST_TEST_EQ(act.ec_, error_code()); + act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action::notify_push_receiver(11u)); // We got a cancellation after delivering it. // This can happen if the cancellation signal arrives before the channel send handler runs - act = fsm.resume(0, ec, cancellation_type_t::terminal); - BOOST_TEST_EQ(act.type_, action::type::done); - BOOST_TEST_EQ(act.push_size_, 0u); - BOOST_TEST_EQ(act.ec_, net::error::operation_aborted); + act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + + // Check logging + fix.check_log({ + {logger::level::debug, "Reader task: issuing read" }, + {logger::level::debug, "Reader task: 23 bytes read"}, + {logger::level::debug, "Reader task: cancelled (2)"}, + }); } } // namespace @@ -311,8 +415,10 @@ int main() test_push_deliver_error(); test_max_read_buffer_size(); - test_cancel_after_read(); - test_cancel_after_push_delivery(); + test_cancel_read(); + test_cancel_read_edge(); + test_cancel_push_delivery(); + test_cancel_push_delivery_edge(); return boost::report_errors(); }