Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cff2c97
Initial impl
anarthal Sep 27, 2025
ee255e7
Bridge with I/O
anarthal Sep 27, 2025
7c38d02
Merge branch 'develop' into feature/connect-fsm
anarthal Sep 27, 2025
c794327
Initial testing setup
anarthal Sep 29, 2025
259a793
Check logging
anarthal Sep 29, 2025
2a779d2
Move to ipp
anarthal Sep 29, 2025
b7aed68
Resolve error
anarthal Sep 29, 2025
3e2970f
Fixture
anarthal Sep 29, 2025
2fad9ef
Timeout test and fix bug
anarthal Sep 29, 2025
c7b26ba
Resolve cancel
anarthal Sep 29, 2025
b61952e
Resolve cancel edge
anarthal Sep 29, 2025
3b3d64c
connect error
anarthal Sep 29, 2025
cd42edf
Connect timeout
anarthal Sep 29, 2025
79924f0
connect cancel
anarthal Sep 29, 2025
4cc6f8f
cancel edge
anarthal Sep 29, 2025
1c5d45f
TLS success
anarthal Sep 29, 2025
ef847ef
TLS reconnect
anarthal Sep 29, 2025
8417c91
ssl handshake error
anarthal Sep 29, 2025
84bc7b7
SSL handshake timeout
anarthal Sep 29, 2025
0987223
Handshake cancel
anarthal Sep 29, 2025
9eeadb7
ssl cancel edge
anarthal Sep 29, 2025
a61db76
UNIX success
anarthal Sep 29, 2025
9846bcc
connect error
anarthal Sep 29, 2025
3ab228b
connect timeout
anarthal Sep 29, 2025
a7a2613
UNIX connect cancel
anarthal Sep 29, 2025
a39c46a
unix edge
anarthal Sep 29, 2025
5926dfa
Fix includes
anarthal Sep 29, 2025
4cab8d9
Merge branch 'develop' into feature/connect-fsm
anarthal Oct 3, 2025
769a01a
UNIX socket close
anarthal Oct 3, 2025
dfdb74a
Fix unit tests
anarthal Oct 3, 2025
fbbb673
close error test
anarthal Oct 3, 2025
703fd70
Merge branch 'develop' into feature/connect-fsm
anarthal Oct 3, 2025
7a136b0
Simplification
anarthal Oct 3, 2025
19b7016
Merge branch 'develop' into feature/connect-fsm
anarthal Oct 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ class run_op {
for (;;) {
// Try to connect
BOOST_ASIO_CORO_YIELD
conn_->stream_.async_connect(&conn_->cfg_, &conn_->logger_, std::move(self));
conn_->stream_.async_connect(conn_->cfg_, conn_->logger_, std::move(self));

// Check for cancellations
if (is_cancelled(self)) {
Expand Down
97 changes: 97 additions & 0 deletions include/boost/redis/detail/connect_fsm.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_REDIS_CONNECT_FSM_HPP
#define BOOST_REDIS_CONNECT_FSM_HPP

#include <boost/redis/config.hpp>

#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/system/error_code.hpp>

// Sans-io algorithm for redis_stream::async_connect, as a finite state machine

namespace boost::redis::detail {

class connection_logger;

// What transport is redis_stream using?
enum class transport_type
{
tcp, // plaintext TCP
tcp_tls, // TLS over TCP
unix_socket, // UNIX domain sockets
};

struct redis_stream_state {
transport_type type{transport_type::tcp};
bool ssl_stream_used{false};
};

// What should we do next?
enum class connect_action_type
{
unix_socket_close, // Close the UNIX socket, to discard state
unix_socket_connect, // Connect to the UNIX socket
tcp_resolve, // Name resolution
tcp_connect, // TCP connect
ssl_stream_reset, // Re-create the SSL stream, to discard state
ssl_handshake, // SSL handshake
done, // Complete the async op
};

struct connect_action {
connect_action_type type;
system::error_code ec;

connect_action(connect_action_type type) noexcept
: type{type}
{ }

connect_action(system::error_code ec) noexcept
: type{connect_action_type::done}
, ec{ec}
{ }
};

class connect_fsm {
int resume_point_{0};
const config* cfg_{nullptr};
connection_logger* lgr_{nullptr};

public:
connect_fsm(const config& cfg, connection_logger& lgr) noexcept
: cfg_(&cfg)
, lgr_(&lgr)
{ }

const config& get_config() const { return *cfg_; }

connect_action resume(
system::error_code ec,
const asio::ip::tcp::resolver::results_type& resolver_results,
redis_stream_state& st,
asio::cancellation_type_t cancel_state);

connect_action resume(
system::error_code ec,
const asio::ip::tcp::endpoint& selected_endpoint,
redis_stream_state& st,
asio::cancellation_type_t cancel_state);

connect_action resume(
system::error_code ec,
redis_stream_state& st,
asio::cancellation_type_t cancel_state);

}; // namespace boost::redis::detail

} // namespace boost::redis::detail

#endif
208 changes: 82 additions & 126 deletions include/boost/redis/detail/redis_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define BOOST_REDIS_REDIS_STREAM_HPP

#include <boost/redis/config.hpp>
#include <boost/redis/detail/connect_fsm.hpp>
#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/error.hpp>

Expand All @@ -31,14 +32,6 @@ namespace boost {
namespace redis {
namespace detail {

// What transport is redis_stream using?
enum class transport_type
{
tcp, // plaintext TCP
tcp_tls, // TLS over TCP
unix_socket, // UNIX domain sockets
};

template <class Executor>
class redis_stream {
asio::ssl::context ssl_ctx_;
Expand All @@ -48,140 +41,103 @@ class redis_stream {
asio::basic_stream_socket<asio::local::stream_protocol, Executor> unix_socket_;
#endif
typename asio::steady_timer::template rebind_executor<Executor>::other timer_;

transport_type transport_{transport_type::tcp};
bool ssl_stream_used_{false};
redis_stream_state st_;

void reset_stream() { stream_ = {resolv_.get_executor(), ssl_ctx_}; }

static transport_type transport_from_config(const config& cfg)
{
if (cfg.unix_socket.empty()) {
if (cfg.use_ssl) {
return transport_type::tcp_tls;
} else {
return transport_type::tcp;
}
} else {
BOOST_ASSERT(!cfg.use_ssl);
return transport_type::unix_socket;
}
}

struct connect_op {
redis_stream& obj;
const config* cfg;
connection_logger* lgr;
asio::coroutine coro{};

// This overload will be used for connects. We only need the endpoint
// for logging, so log it and call the coroutine
template <class Self>
void operator()(
Self& self,
system::error_code ec,
const asio::ip::tcp::endpoint& selected_endpoint)
{
lgr->on_connect(ec, selected_endpoint);
(*this)(self, ec);
}
connect_fsm fsm_;

template <class Self>
void operator()(
Self& self,
system::error_code ec = {},
asio::ip::tcp::resolver::results_type resolver_results = {})
void execute_action(Self& self, connect_action act)
{
BOOST_ASIO_CORO_REENTER(coro)
{
// Record the transport that we will be using
obj.transport_ = transport_from_config(*cfg);
const auto& cfg = fsm_.get_config();

if (obj.transport_ == transport_type::unix_socket) {
switch (act.type) {
case connect_action_type::unix_socket_close:
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
// Discard any existing state
{
system::error_code ec;
obj.unix_socket_.close(ec);

// Directly connect to the socket
BOOST_ASIO_CORO_YIELD
(*this)(self, ec); // This is a sync action
}
#else
BOOST_ASSERT(false);
#endif
return;
case connect_action_type::unix_socket_connect:
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
obj.unix_socket_.async_connect(
cfg->unix_socket,
asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));

// Log it
lgr->on_connect(ec, cfg->unix_socket);

// If this failed, we can't continue
if (ec) {
self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec);
return;
}
cfg.unix_socket,
asio::cancel_after(obj.timer_, cfg.connect_timeout, std::move(self)));
#else
BOOST_ASSERT(false);
#endif
} else {
// ssl::stream doesn't support being re-used. If we're to use
// TLS and the stream has been used, re-create it.
// Must be done before anything else is done on the stream.
// Note that we don't need to close the socket here because
// range connect does it for us.
if (cfg->use_ssl && obj.ssl_stream_used_)
obj.reset_stream();
return;

BOOST_ASIO_CORO_YIELD
case connect_action_type::tcp_resolve:
obj.resolv_.async_resolve(
cfg->addr.host,
cfg->addr.port,
asio::cancel_after(obj.timer_, cfg->resolve_timeout, std::move(self)));

// Log it
lgr->on_resolve(ec, resolver_results);

// If this failed, we can't continue
if (ec) {
self.complete(ec == asio::error::operation_aborted ? error::resolve_timeout : ec);
return;
}

// Connect to the address that the resolver provided us
BOOST_ASIO_CORO_YIELD
asio::async_connect(
obj.stream_.next_layer(),
std::move(resolver_results),
asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));

// Note: logging is performed in the specialized operator() function.
// If this failed, we can't continue
if (ec) {
self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec);
return;
}

if (cfg->use_ssl) {
// Mark the SSL stream as used
obj.ssl_stream_used_ = true;

// If we were configured to use TLS, perform the handshake
BOOST_ASIO_CORO_YIELD
obj.stream_.async_handshake(
asio::ssl::stream_base::client,
asio::cancel_after(obj.timer_, cfg->ssl_handshake_timeout, std::move(self)));

lgr->on_ssl_handshake(ec);
cfg.addr.host,
cfg.addr.port,
asio::cancel_after(obj.timer_, cfg.resolve_timeout, std::move(self)));
return;
case connect_action_type::ssl_stream_reset:
obj.reset_stream();
// this action does not require yielding. Execute the next action immediately
(*this)(self);
return;
case connect_action_type::ssl_handshake:
obj.stream_.async_handshake(
asio::ssl::stream_base::client,
asio::cancel_after(obj.timer_, cfg.ssl_handshake_timeout, std::move(self)));
return;
case connect_action_type::done: self.complete(act.ec); break;
// Connect should use the specialized handler, where resolver results are available
case connect_action_type::tcp_connect:
default: BOOST_ASSERT(false);
}
}

// If this failed, we can't continue
if (ec) {
self.complete(
ec == asio::error::operation_aborted ? error::ssl_handshake_timeout : ec);
return;
}
}
}
// This overload will be used for connects
template <class Self>
void operator()(
Self& self,
system::error_code ec,
const asio::ip::tcp::endpoint& selected_endpoint)
{
auto act = fsm_.resume(
ec,
selected_endpoint,
obj.st_,
self.get_cancellation_state().cancelled());
execute_action(self, act);
}

// Done
self.complete(system::error_code());
// This overload will be used for resolves
template <class Self>
void operator()(
Self& self,
system::error_code ec,
asio::ip::tcp::resolver::results_type endpoints)
{
auto act = fsm_.resume(ec, endpoints, obj.st_, self.get_cancellation_state().cancelled());
if (act.type == connect_action_type::tcp_connect) {
asio::async_connect(
obj.stream_.next_layer(),
std::move(endpoints),
asio::cancel_after(obj.timer_, fsm_.get_config().connect_timeout, std::move(self)));
} else {
execute_action(self, act);
}
}

template <class Self>
void operator()(Self& self, system::error_code ec = {})
{
auto act = fsm_.resume(ec, obj.st_, self.get_cancellation_state().cancelled());
execute_action(self, act);
}
};

public:
Expand All @@ -204,7 +160,7 @@ class redis_stream {
bool is_open() const
{
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
if (transport_ == transport_type::unix_socket)
if (st_.type == transport_type::unix_socket)
return unix_socket_.is_open();
#endif
return stream_.next_layer().is_open();
Expand All @@ -214,18 +170,18 @@ class redis_stream {

// I/O
template <class CompletionToken>
auto async_connect(const config* cfg, connection_logger* l, CompletionToken&& token)
auto async_connect(const config& cfg, connection_logger& l, CompletionToken&& token)
{
return asio::async_compose<CompletionToken, void(system::error_code)>(
connect_op{*this, cfg, l},
connect_op{*this, connect_fsm(cfg, l)},
token);
}

// These functions should only be used with callbacks (e.g. within async_compose function bodies)
template <class ConstBufferSequence, class CompletionToken>
void async_write_some(const ConstBufferSequence& buffers, CompletionToken&& token)
{
switch (transport_) {
switch (st_.type) {
case transport_type::tcp:
{
stream_.next_layer().async_write_some(buffers, std::forward<CompletionToken>(token));
Expand All @@ -250,7 +206,7 @@ class redis_stream {
template <class MutableBufferSequence, class CompletionToken>
void async_read_some(const MutableBufferSequence& buffers, CompletionToken&& token)
{
switch (transport_) {
switch (st_.type) {
case transport_type::tcp:
{
return stream_.next_layer().async_read_some(
Expand Down
Loading