Skip to content

Commit

Permalink
Workaround asio::ssl async_read_some busy-loop
Browse files Browse the repository at this point in the history
chriskohlhoff/asio#1015

`asio::ssl` has a bug where calling async_read_some with `null_buffers`
isn't correctly handled, and more or less immediately invokes the
provided handler with no data. This causes `amqpprox` to busy-loop
whenever a TLS socket has been accepted or opened.

There were two options for how to fix this:

1) Always read with a fixed size buffer, such as 32kb. This would
   simplify the code slightly, at the expense of needing multiple reads
   to handle larger than 32kb frames in non-TLS mode, even when the full
   frame is available to `amqpprox` in one go.
2) Ask asio::ssl to read with a very small buffer, then ask the openssl
   library how many bytes are available. This technique aligns with how
   `amqpprox`'s read loop works today. That is what is implemented here.

In theory something similar could be upstreamed into `asio::ssl`. It's a
little tricky though and this exact code couldn't handle the generic
`MutableBufferSequence` interface - we can take some shortcuts in our
code.

I've done some benchmarking to check this change isn't going to regress
performance noticeably. Data throughput tests indicate that this fix improves
performance for TLS connections over the existing code. Still running
connection throughput tests.
  • Loading branch information
adamncasey committed Apr 12, 2022
1 parent c8bb8d2 commit 4081d1c
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 26 deletions.
109 changes: 93 additions & 16 deletions libamqpprox/amqpprox_maybesecuresocketadaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <boost/asio.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/ssl/stream_base.hpp>
#include <openssl/ssl.h>

#include <amqpprox_logging.h>
#include <amqpprox_socketintercept.h>
Expand All @@ -38,36 +39,42 @@ class MaybeSecureSocketAdaptor {
using endpoint = boost::asio::ip::tcp::endpoint;
using handshake_type = boost::asio::ssl::stream_base::handshake_type;

boost::asio::io_service & d_ioService;
boost::asio::io_service &d_ioService;
std::optional<std::reference_wrapper<SocketIntercept>> d_intercept;
std::unique_ptr<stream_type> d_socket;
bool d_secured;
bool d_handshook;
std::unique_ptr<stream_type> d_socket;
bool d_secured;
bool d_handshook;
char d_smallBuffer;
bool d_smallBufferSet;

public:
typedef typename stream_type::executor_type executor_type;

#ifdef SOCKET_TESTING
MaybeSecureSocketAdaptor(boost::asio::io_service &ioService,
SocketIntercept & intercept,
SocketIntercept &intercept,
bool secured)
: d_ioService(ioService)
, d_intercept(intercept)
, d_socket()
, d_secured(secured)
, d_handshook(false)
, d_smallBuffer(0)
, d_smallBufferSet(false)
{
}
#endif

MaybeSecureSocketAdaptor(boost::asio::io_service & ioService,
MaybeSecureSocketAdaptor(boost::asio::io_service &ioService,
boost::asio::ssl::context &context,
bool secured)
: d_ioService(ioService)
, d_intercept()
, d_socket(std::make_unique<stream_type>(ioService, context))
, d_secured(secured)
, d_handshook(false)
, d_smallBuffer(0)
, d_smallBufferSet(false)
{
}

Expand All @@ -77,10 +84,14 @@ class MaybeSecureSocketAdaptor {
, d_socket(std::move(src.d_socket))
, d_secured(src.d_secured)
, d_handshook(src.d_handshook)
, d_smallBuffer(src.d_smallBuffer)
, d_smallBufferSet(src.d_smallBufferSet)
{
src.d_socket = std::unique_ptr<stream_type>();
src.d_secured = false;
src.d_handshook = false;
src.d_socket = std::unique_ptr<stream_type>();
src.d_secured = false;
src.d_handshook = false;
src.d_smallBuffer = 0;
src.d_smallBufferSet = false;
}

boost::asio::ip::tcp::socket &socket() { return d_socket->next_layer(); }
Expand Down Expand Up @@ -182,13 +193,24 @@ class MaybeSecureSocketAdaptor {
d_socket->next_layer().close(ec);
}

/**
* Indicates the number of bytes immediately readable out of the socket
* For TLS connections this references the number of bytes which are
* immediately available for reading from the current fully-read record
*/
std::size_t available(boost::system::error_code &ec)
{
if (BOOST_UNLIKELY(d_intercept.has_value())) {
return d_intercept.value().get().available(ec);
}

return d_socket->next_layer().available(ec);
if (d_secured) {
return (d_smallBufferSet ? 1 : 0) +
SSL_pending(d_socket->native_handle());
}
else {
return d_socket->next_layer().available(ec);
}
}

template <typename ConnectHandler>
Expand Down Expand Up @@ -247,35 +269,90 @@ class MaybeSecureSocketAdaptor {

template <typename MutableBufferSequence>
std::size_t read_some(const MutableBufferSequence &buffers,
boost::system::error_code & ec)
boost::system::error_code &ec)
{
if (BOOST_UNLIKELY(d_intercept.has_value())) {
return d_intercept.value().get().read_some(buffers, ec);
}

if (isSecure()) {
// Ensure we read the small-buffer-workaround if it has been used
if (d_smallBufferSet && buffers.size() >= 1) {
((char *)buffers.data())[0] = d_smallBuffer;
d_smallBufferSet = false;

MutableBufferSequence replacement(buffers);
replacement += 1;

size_t result = d_socket->read_some(replacement, ec);

if (ec && result == 0) {
ec = boost::system::error_code();
// Pretend read_some succeeded this time around because
// there's one byte left over.
return 1;
}

return 1 + result;
}

return d_socket->read_some(buffers, ec);
}
else {
return d_socket->next_layer().read_some(buffers, ec);
}
}

template <typename MutableBufferSequence, typename ReadHandler>
/**
* This async_read_some specialisation is required due to
* https://github.com/chriskohlhoff/asio/issues/1015
*
* For TLS sockets we need to ensure we call this method with a buffer size
* of at least one byte. This is handled by passing in a small buffer (1
* byte). This byte is then passed back via `read_some`. The presense of
* this byte is also represented in the return value of `available`.
*/
template <typename ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler,
void(boost::system::error_code, std::size_t))
async_read_some(const MutableBufferSequence &buffers,
async_read_some(const boost::asio::null_buffers &null_buffer,
BOOST_ASIO_MOVE_ARG(ReadHandler) handler)
{
if (BOOST_UNLIKELY(d_intercept.has_value())) {
return d_intercept.value().get().async_read_some(buffers, handler);
return d_intercept.value().get().async_read_some(null_buffer,
handler);
}

if (isSecure()) {
return d_socket->async_read_some(buffers, handler);
if (d_smallBufferSet) {
// The reader missed a byte - invoke ssl
// async_read_some(zero-sized-buffer) so the handler is
// immediately invoked to collect this missing byte. This
// codepath wasn't hit during testing, but it's left here for
// completeness
LOG_DEBUG << "Invoked async_read_some again before reading "
"data. Immediately invoking handler";

return d_socket->async_read_some(
boost::asio::buffer(&d_smallBuffer, 0), handler);
}

// async_read_some with a one byte buffer to ensure we are only
// called with useful progress
return d_socket->async_read_some(
boost::asio::buffer(&d_smallBuffer, sizeof(d_smallBuffer)),
[this, handler](boost::system::error_code ec,
std::size_t length) {
if (length != 0) {
d_smallBufferSet = true;
}

handler(ec, length);
});
}
else {
return d_socket->next_layer().async_read_some(buffers, handler);
return d_socket->next_layer().async_read_some(null_buffer,
handler);
}
}
};
Expand Down
25 changes: 15 additions & 10 deletions libamqpprox/amqpprox_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ namespace amqpprox {
using namespace boost::asio::ip;
using namespace boost::system;

Session::Session(boost::asio::io_service & ioservice,
MaybeSecureSocketAdaptor && serverSocket,
MaybeSecureSocketAdaptor && clientSocket,
ConnectionSelector * connectionSelector,
EventSource * eventSource,
BufferPool * bufferPool,
DNSResolver * dnsResolver,
Session::Session(boost::asio::io_service &ioservice,
MaybeSecureSocketAdaptor &&serverSocket,
MaybeSecureSocketAdaptor &&clientSocket,
ConnectionSelector *connectionSelector,
EventSource *eventSource,
BufferPool *bufferPool,
DNSResolver *dnsResolver,
const std::shared_ptr<HostnameMapper> &hostnameMapper,
std::string_view localHostname,
const std::shared_ptr<AuthInterceptInterface> &authIntercept)
Expand Down Expand Up @@ -184,7 +184,7 @@ void Session::attemptConnection(
using endpointType = boost::asio::ip::tcp::endpoint;
auto self(shared_from_this());
auto callback = [this, self, connectionManager](
const error_code & ec,
const error_code &ec,
std::vector<endpointType> endpoints) {
BOOST_LOG_SCOPED_THREAD_ATTR(
"Vhost",
Expand Down Expand Up @@ -682,6 +682,11 @@ void Session::readData(FlowType direction)
readData(direction);
}
else {
if (readAmount > 0) {
LOG_TRACE << "read_some returned data and error. Data "
"discarded from "
<< direction << " to close sockets";
}
handleSessionError("read_some", direction, ec);
return;
}
Expand Down Expand Up @@ -754,7 +759,7 @@ void Session::handleData(FlowType direction)
}
}

void Session::handleSessionError(const char * action,
void Session::handleSessionError(const char *action,
FlowType direction,
boost::system::error_code ec)
{
Expand Down Expand Up @@ -818,7 +823,7 @@ void Session::handleSessionError(const char * action,
}

void Session::handleConnectionError(
const char * action,
const char *action,
boost::system::error_code ec,
const std::shared_ptr<ConnectionManager> &connectionManager)
{
Expand Down

0 comments on commit 4081d1c

Please sign in to comment.