Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workaround asio::ssl async_read_some busy-loop #69

Merged
merged 1 commit into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 93 additions & 16 deletions libamqpprox/amqpprox_maybesecuresocketadaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <boost/asio.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/ssl/stream_base.hpp>
#include <openssl/ssl.h>

#include <amqpprox_logging.h>
#include <amqpprox_socketintercept.h>
Expand All @@ -38,36 +39,42 @@ class MaybeSecureSocketAdaptor {
using endpoint = boost::asio::ip::tcp::endpoint;
using handshake_type = boost::asio::ssl::stream_base::handshake_type;

boost::asio::io_service & d_ioService;
boost::asio::io_service &d_ioService;
std::optional<std::reference_wrapper<SocketIntercept>> d_intercept;
std::unique_ptr<stream_type> d_socket;
bool d_secured;
bool d_handshook;
std::unique_ptr<stream_type> d_socket;
bool d_secured;
bool d_handshook;
char d_smallBuffer;
bool d_smallBufferSet;

public:
typedef typename stream_type::executor_type executor_type;

#ifdef SOCKET_TESTING
MaybeSecureSocketAdaptor(boost::asio::io_service &ioService,
SocketIntercept & intercept,
SocketIntercept &intercept,
bool secured)
: d_ioService(ioService)
, d_intercept(intercept)
, d_socket()
, d_secured(secured)
, d_handshook(false)
, d_smallBuffer(0)
, d_smallBufferSet(false)
{
}
adamncasey marked this conversation as resolved.
Show resolved Hide resolved
#endif

MaybeSecureSocketAdaptor(boost::asio::io_service & ioService,
MaybeSecureSocketAdaptor(boost::asio::io_service &ioService,
boost::asio::ssl::context &context,
bool secured)
: d_ioService(ioService)
, d_intercept()
, d_socket(std::make_unique<stream_type>(ioService, context))
, d_secured(secured)
, d_handshook(false)
, d_smallBuffer(0)
, d_smallBufferSet(false)
{
}

Expand All @@ -77,10 +84,14 @@ class MaybeSecureSocketAdaptor {
, d_socket(std::move(src.d_socket))
, d_secured(src.d_secured)
, d_handshook(src.d_handshook)
, d_smallBuffer(src.d_smallBuffer)
, d_smallBufferSet(src.d_smallBufferSet)
{
src.d_socket = std::unique_ptr<stream_type>();
src.d_secured = false;
src.d_handshook = false;
src.d_socket = std::unique_ptr<stream_type>();
src.d_secured = false;
src.d_handshook = false;
src.d_smallBuffer = 0;
src.d_smallBufferSet = false;
}

boost::asio::ip::tcp::socket &socket() { return d_socket->next_layer(); }
Expand Down Expand Up @@ -182,13 +193,24 @@ class MaybeSecureSocketAdaptor {
d_socket->next_layer().close(ec);
}

/**
* Indicates the number of bytes immediately readable out of the socket
* For TLS connections this references the number of bytes which are
* immediately available for reading from the current fully-read record
*/
std::size_t available(boost::system::error_code &ec)
{
if (BOOST_UNLIKELY(d_intercept.has_value())) {
return d_intercept.value().get().available(ec);
}

return d_socket->next_layer().available(ec);
if (d_secured) {
return (d_smallBufferSet ? 1 : 0) +
SSL_pending(d_socket->native_handle());
}
else {
return d_socket->next_layer().available(ec);
}
}

template <typename ConnectHandler>
Expand Down Expand Up @@ -247,35 +269,90 @@ class MaybeSecureSocketAdaptor {

template <typename MutableBufferSequence>
std::size_t read_some(const MutableBufferSequence &buffers,
boost::system::error_code & ec)
boost::system::error_code &ec)
{
if (BOOST_UNLIKELY(d_intercept.has_value())) {
return d_intercept.value().get().read_some(buffers, ec);
}

if (isSecure()) {
// Ensure we read the small-buffer-workaround if it has been used
if (d_smallBufferSet && buffers.size() >= 1) {
((char *)buffers.data())[0] = d_smallBuffer;
d_smallBufferSet = false;

MutableBufferSequence replacement(buffers);
replacement += 1;

size_t result = d_socket->read_some(replacement, ec);

if (ec && result == 0) {
ec = boost::system::error_code();
// Pretend read_some succeeded this time around because
// there's one byte left over.
return 1;
}

return 1 + result;
}

return d_socket->read_some(buffers, ec);
}
else {
return d_socket->next_layer().read_some(buffers, ec);
}
}

template <typename MutableBufferSequence, typename ReadHandler>
/**
* This async_read_some specialisation is required due to
* https://github.com/chriskohlhoff/asio/issues/1015
*
* For TLS sockets we need to ensure we call this method with a buffer size
* of at least one byte. This is handled by passing in a small buffer (1
* byte). This byte is then passed back via `read_some`. The presense of
* this byte is also represented in the return value of `available`.
*/
template <typename ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler,
void(boost::system::error_code, std::size_t))
async_read_some(const MutableBufferSequence &buffers,
async_read_some(const boost::asio::null_buffers &null_buffer,
BOOST_ASIO_MOVE_ARG(ReadHandler) handler)
{
if (BOOST_UNLIKELY(d_intercept.has_value())) {
return d_intercept.value().get().async_read_some(buffers, handler);
return d_intercept.value().get().async_read_some(null_buffer,
handler);
}

if (isSecure()) {
return d_socket->async_read_some(buffers, handler);
if (d_smallBufferSet) {
// The reader missed a byte - invoke ssl
// async_read_some(zero-sized-buffer) so the handler is
// immediately invoked to collect this missing byte. This
// codepath wasn't hit during testing, but it's left here for
// completeness
LOG_DEBUG << "Invoked async_read_some again before reading "
"data. Immediately invoking handler";

return d_socket->async_read_some(
boost::asio::buffer(&d_smallBuffer, 0), handler);
}

// async_read_some with a one byte buffer to ensure we are only
// called with useful progress
return d_socket->async_read_some(
boost::asio::buffer(&d_smallBuffer, sizeof(d_smallBuffer)),
[this, handler](boost::system::error_code ec,
std::size_t length) {
if (length != 0) {
d_smallBufferSet = true;
}

handler(ec, length);
});
}
else {
return d_socket->next_layer().async_read_some(buffers, handler);
return d_socket->next_layer().async_read_some(null_buffer,
handler);
}
}
};
Expand Down
25 changes: 15 additions & 10 deletions libamqpprox/amqpprox_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ namespace amqpprox {
using namespace boost::asio::ip;
using namespace boost::system;

Session::Session(boost::asio::io_service & ioservice,
MaybeSecureSocketAdaptor && serverSocket,
MaybeSecureSocketAdaptor && clientSocket,
ConnectionSelector * connectionSelector,
EventSource * eventSource,
BufferPool * bufferPool,
DNSResolver * dnsResolver,
Session::Session(boost::asio::io_service &ioservice,
MaybeSecureSocketAdaptor &&serverSocket,
MaybeSecureSocketAdaptor &&clientSocket,
ConnectionSelector *connectionSelector,
EventSource *eventSource,
BufferPool *bufferPool,
DNSResolver *dnsResolver,
const std::shared_ptr<HostnameMapper> &hostnameMapper,
std::string_view localHostname,
const std::shared_ptr<AuthInterceptInterface> &authIntercept)
Expand Down Expand Up @@ -184,7 +184,7 @@ void Session::attemptConnection(
using endpointType = boost::asio::ip::tcp::endpoint;
auto self(shared_from_this());
auto callback = [this, self, connectionManager](
const error_code & ec,
const error_code &ec,
std::vector<endpointType> endpoints) {
BOOST_LOG_SCOPED_THREAD_ATTR(
"Vhost",
Expand Down Expand Up @@ -682,6 +682,11 @@ void Session::readData(FlowType direction)
readData(direction);
}
else {
if (readAmount > 0) {
LOG_TRACE << "read_some returned data and error. Data "
"discarded from "
<< direction << " to close sockets";
}
handleSessionError("read_some", direction, ec);
return;
}
Expand Down Expand Up @@ -754,7 +759,7 @@ void Session::handleData(FlowType direction)
}
}

void Session::handleSessionError(const char * action,
void Session::handleSessionError(const char *action,
FlowType direction,
boost::system::error_code ec)
{
Expand Down Expand Up @@ -818,7 +823,7 @@ void Session::handleSessionError(const char * action,
}

void Session::handleConnectionError(
const char * action,
const char *action,
boost::system::error_code ec,
const std::shared_ptr<ConnectionManager> &connectionManager)
{
Expand Down