Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,19 +245,15 @@ struct reader_op {
public:
reader_op(connection_impl<Executor>& conn) noexcept
: conn_{&conn}
, fsm_{conn.st_.mpx}
{ }

template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
{
for (;;) {
auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());

conn_->st_.logger.on_fsm_resume(act);
auto act = fsm_.resume(conn_->st_, n, ec, self.get_cancellation_state().cancelled());

switch (act.type_) {
case reader_fsm::action::type::needs_more:
case reader_fsm::action::type::read_some:
{
auto const buf = conn_->st_.mpx.get_prepared_read_buffer();
Expand Down
3 changes: 1 addition & 2 deletions include/boost/redis/detail/connection_logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#ifndef BOOST_REDIS_CONNECTION_LOGGER_HPP
#define BOOST_REDIS_CONNECTION_LOGGER_HPP

#include <boost/redis/detail/reader_fsm.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/response.hpp>

Expand Down Expand Up @@ -38,7 +37,7 @@ class connection_logger {
void on_connect(system::error_code const& ec, std::string_view unix_socket_ep);
void on_ssl_handshake(system::error_code const& ec);
void on_write(system::error_code const& ec, std::size_t n);
void on_fsm_resume(reader_fsm::action const& action);
void on_read(system::error_code const& ec, std::size_t n);
void on_setup(system::error_code const& ec, generic_response const& resp);
void log(logger::level lvl, std::string_view msg);
void log(logger::level lvl, std::string_view msg1, std::string_view msg2);
Expand Down
31 changes: 23 additions & 8 deletions include/boost/redis/detail/reader_fsm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#ifndef BOOST_REDIS_READER_FSM_HPP
#define BOOST_REDIS_READER_FSM_HPP

#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/multiplexer.hpp>

#include <boost/asio/cancellation_type.hpp>
Expand All @@ -23,27 +25,40 @@ class reader_fsm {
enum class type
{
read_some,
needs_more,
notify_push_receiver,
done,
};

type type_ = type::done;
std::size_t push_size_ = 0u;
system::error_code ec_ = {};
};
action(type t, std::size_t push_size = 0u) noexcept
: type_(t)
, push_size_(push_size)
{ }

explicit reader_fsm(multiplexer& mpx) noexcept;
action(system::error_code ec) noexcept
: type_(type::done)
, ec_(ec)
{ }

static action notify_push_receiver(std::size_t bytes)
{
return {type::notify_push_receiver, bytes};
}

type type_;
std::size_t push_size_{};
system::error_code ec_;
};

action resume(
connection_state& st,
std::size_t bytes_read,
system::error_code ec,
asio::cancellation_type_t cancel_state);

reader_fsm() = default;

private:
int resume_point_{0};
action::type next_read_type_ = action::type::read_some;
multiplexer* mpx_ = nullptr;
std::pair<consume_result, std::size_t> res_{consume_result::needs_more, 0u};
};

Expand Down
49 changes: 9 additions & 40 deletions include/boost/redis/impl/connection_logger.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/detail/exec_fsm.hpp>
#include <boost/redis/logger.hpp>

#include <boost/asio/ip/tcp.hpp>
Expand All @@ -16,35 +15,6 @@

namespace boost::redis::detail {

#define BOOST_REDIS_READER_SWITCH_CASE(elem) \
case reader_fsm::action::type::elem: return "reader_fsm::action::type::" #elem

#define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \
case exec_action_type::elem: return "exec_action_type::" #elem

auto to_string(reader_fsm::action::type t) noexcept -> char const*
{
switch (t) {
BOOST_REDIS_READER_SWITCH_CASE(read_some);
BOOST_REDIS_READER_SWITCH_CASE(needs_more);
BOOST_REDIS_READER_SWITCH_CASE(notify_push_receiver);
BOOST_REDIS_READER_SWITCH_CASE(done);
default: return "action::type::<invalid type>";
}
}

auto to_string(exec_action_type t) noexcept -> char const*
{
switch (t) {
BOOST_REDIS_EXEC_SWITCH_CASE(setup_cancellation);
BOOST_REDIS_EXEC_SWITCH_CASE(immediate);
BOOST_REDIS_EXEC_SWITCH_CASE(done);
BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer);
BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response);
default: return "exec_action_type::<invalid type>";
}
}

inline void format_tcp_endpoint(const asio::ip::tcp::endpoint& ep, std::string& to)
{
// This formatting is inspired by Asio's endpoint operator<<
Expand Down Expand Up @@ -161,21 +131,20 @@ void connection_logger::on_write(system::error_code const& ec, std::size_t n)
logger_.fn(logger::level::info, msg_);
}

void connection_logger::on_fsm_resume(reader_fsm::action const& action)
void connection_logger::on_read(system::error_code const& ec, std::size_t bytes_read)
{
if (logger_.lvl < logger::level::debug)
return;

std::string msg;
msg += "(";
msg += to_string(action.type_);
msg += ", ";
msg += std::to_string(action.push_size_);
msg += ", ";
msg += action.ec_.message();
msg += ")";
msg_ = "Reader task: ";
msg_ += std::to_string(bytes_read);
msg_ += " bytes read";
if (ec) {
msg_ += ", error: ";
format_error_code(ec, msg_);
}

logger_.fn(logger::level::debug, msg);
logger_.fn(logger::level::debug, msg_);
}

void connection_logger::on_setup(system::error_code const& ec, generic_response const& resp)
Expand Down
59 changes: 34 additions & 25 deletions include/boost/redis/impl/reader_fsm.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* accompanying file LICENSE.txt)
*/

#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
Expand All @@ -14,11 +15,8 @@

namespace boost::redis::detail {

reader_fsm::reader_fsm(multiplexer& mpx) noexcept
: mpx_{&mpx}
{ }

reader_fsm::action reader_fsm::resume(
connection_state& st,
std::size_t bytes_read,
system::error_code ec,
asio::cancellation_type_t cancel_state)
Expand All @@ -28,53 +26,64 @@ reader_fsm::action reader_fsm::resume(

for (;;) {
// Prepare the buffer for the read operation
ec = mpx_->prepare_read();
ec = st.mpx.prepare_read();
if (ec) {
return {action::type::done, 0, ec};
st.logger.trace("Reader task: error in prepare_read", ec);
return {ec};
}

// Read
BOOST_REDIS_YIELD(resume_point_, 1, next_read_type_)
st.logger.trace("Reader task: issuing read");
BOOST_REDIS_YIELD(resume_point_, 1, action::type::read_some)

// Check for cancellations
if (is_terminal_cancel(cancel_state)) {
st.logger.trace("Reader task: cancelled (1)");
return {asio::error::operation_aborted};
}

// Log what we read
st.logger.on_read(ec, bytes_read);

// Process the bytes read, even if there was an error
mpx_->commit_read(bytes_read);
st.mpx.commit_read(bytes_read);

// Check for read errors
if (ec) {
// TODO: If an error occurred but data was read (i.e.
// bytes_read != 0) we should try to process that data and
// deliver it to the user before calling cancel_run.
return {action::type::done, bytes_read, ec};
}

// Check for cancellations
if (is_terminal_cancel(cancel_state)) {
return {action::type::done, 0u, asio::error::operation_aborted};
return {ec};
}

// Process the data that we've read
next_read_type_ = action::type::read_some;
while (mpx_->get_read_buffer_size() != 0) {
res_ = mpx_->consume(ec);
while (st.mpx.get_read_buffer_size() != 0) {
res_ = st.mpx.consume(ec);

if (ec) {
// TODO: Perhaps log what has not been consumed to aid
// debugging.
return {action::type::done, res_.second, ec};
st.logger.trace("Reader task: error processing message", ec);
return {ec};
}

if (res_.first == consume_result::needs_more) {
next_read_type_ = action::type::needs_more;
st.logger.trace("Reader task: incomplete message received");
break;
}

if (res_.first == consume_result::got_push) {
BOOST_REDIS_YIELD(resume_point_, 2, action::type::notify_push_receiver, res_.second)
if (ec) {
return {action::type::done, 0u, ec};
}
BOOST_REDIS_YIELD(resume_point_, 2, action::notify_push_receiver(res_.second))
// Check for cancellations
if (is_terminal_cancel(cancel_state)) {
return {action::type::done, 0u, asio::error::operation_aborted};
st.logger.trace("Reader task: cancelled (2)");
return {asio::error::operation_aborted};
}

// Check for other errors
if (ec) {
st.logger.trace("Reader task: error notifying push receiver", ec);
return {ec};
}
} else {
// TODO: Here we should notify the exec operation that
Expand All @@ -89,7 +98,7 @@ reader_fsm::action reader_fsm::resume(
}

BOOST_ASSERT(false);
return {action::type::done, 0, system::error_code()};
return {system::error_code()};
}

} // namespace boost::redis::detail
23 changes: 18 additions & 5 deletions test/test_exec_fsm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@

#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>
#include <boost/core/lightweight_test.hpp>
#include <boost/system/error_code.hpp>
#include <boost/assert.hpp>

#include "sansio_utils.hpp"

#include <cstddef>
#include <memory>
#include <ostream>
#include <utility>

#include "sansio_utils.hpp"

using namespace boost::redis;
namespace asio = boost::asio;
using detail::exec_fsm;
Expand All @@ -33,11 +33,24 @@ using detail::exec_action;
using boost::system::error_code;
using boost::asio::cancellation_type_t;

#define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \
case exec_action_type::elem: return "exec_action_type::" #elem

static auto to_string(exec_action_type t) noexcept -> char const*
{
switch (t) {
BOOST_REDIS_EXEC_SWITCH_CASE(setup_cancellation);
BOOST_REDIS_EXEC_SWITCH_CASE(immediate);
BOOST_REDIS_EXEC_SWITCH_CASE(done);
BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer);
BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response);
default: return "exec_action_type::<invalid type>";
}
}

// Operators
namespace boost::redis::detail {

extern auto to_string(exec_action_type t) noexcept -> char const*;

std::ostream& operator<<(std::ostream& os, exec_action_type type)
{
os << to_string(type);
Expand Down
Loading