Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add fixes for socket shutdown #20877

Merged
merged 33 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2e9a736
Fix SSL connection retry attempts for cluster-internal connections
jsteemann Apr 30, 2024
a219cf6
apply some review comments
jsteemann Apr 30, 2024
e3f7279
check if connection was aborted
jsteemann Apr 30, 2024
71e2b5e
revert change to cmakelists for tests
jsteemann Apr 30, 2024
92fefa4
Merge branch '3.11' of github.com:arangodb/arangodb into bug-fix-3.11…
jsteemann Apr 30, 2024
b5b50ef
disable socket rearming for now
jsteemann May 2, 2024
4b506ff
simplify PR
jsteemann May 3, 2024
2b4c53d
Merge branch 'bug-fix-3.11/socket-shutdown' of github.com:arangodb/ar…
jsteemann May 3, 2024
11928cd
remove retry entirely
jsteemann May 3, 2024
70f9050
Merge branch '3.11' of github.com:arangodb/arangodb into bug-fix-3.11…
jsteemann May 3, 2024
0b5c67c
fix arangosh build
jsteemann May 3, 2024
178c583
Merge branch 'bug-fix-3.11/socket-shutdown' of github.com:arangodb/ar…
jsteemann May 3, 2024
243c291
Merge branch '3.11' into bug-fix-3.11/socket-shutdown
KVS85 May 3, 2024
11c2730
Merge branch '3.11' of github.com:arangodb/arangodb into bug-fix-3.11…
jsteemann May 6, 2024
43bf56b
further fixes for socket shutdown
jsteemann May 6, 2024
8f7911b
fix ownership issue
jsteemann May 7, 2024
0bc3207
fix compiler warning
jsteemann May 7, 2024
7a31330
Merge branch '3.11' into bug-fix-3.11/socket-shutdown
jsteemann May 7, 2024
3463a1d
Merge branch '3.11' of github.com:arangodb/arangodb into bug-fix-3.11…
jsteemann May 7, 2024
a2706fd
simplify PR
jsteemann May 7, 2024
75333f6
Merge branch 'bug-fix-3.11/socket-shutdown' of github.com:arangodb/ar…
jsteemann May 7, 2024
7039fa6
reset timer earlier
jsteemann May 7, 2024
0981766
make sure callback is always called
jsteemann May 7, 2024
a3381d3
fix timer cancelation race
jsteemann May 7, 2024
3cf96ff
fixes for socket shutdown
jsteemann May 8, 2024
5d1da43
remove invalid assertion
jsteemann May 8, 2024
27c383e
Fix: check if socket is open in connect callback
mpoeter May 8, 2024
cf2ad97
fix compilation
jsteemann May 8, 2024
52a3c69
add tests
jsteemann May 8, 2024
25cbf60
Merge branch '3.11' of github.com:arangodb/arangodb into bug-fix-3.11…
jsteemann May 10, 2024
b8d7d33
increase connection timeout
jsteemann May 10, 2024
23bc61d
Merge branch '3.11' of github.com:arangodb/arangodb into bug-fix-3.11…
jsteemann May 10, 2024
9112003
fix spurious test failure
jsteemann May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions 3rdParty/fuerte/include/fuerte/FuerteLogger.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
#if 0
#include <iostream>
#include <sstream>
#include <string_view>

extern void LogHackWriter(char const* p);
extern void LogHackWriter(std::string_view p);

class LogHack {
std::stringstream _s;
public:
LogHack() {};
~LogHack() { LogHackWriter(_s.str().c_str()); };
~LogHack() { LogHackWriter(_s.str()); };
template<typename T> LogHack& operator<<(T const& o) { _s << o; return *this; }
typedef std::basic_ostream<char, std::char_traits<char> > CoutType;
typedef CoutType& (*StandardEndLine)(CoutType&);
Expand Down Expand Up @@ -115,11 +116,4 @@ class LogHack {
if (0) std::cout
#endif

#if ENABLE_FUERTE_LOG_NODE > 0
#define FUERTE_LOG_NODE std::cout
#else
#define FUERTE_LOG_NODE \
if (0) std::cout
#endif

#endif
8 changes: 8 additions & 0 deletions 3rdParty/fuerte/include/fuerte/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ class ConnectionBuilder {
return *this;
}

#ifdef ARANGODB_USE_GOOGLE_TESTS
unsigned failConnectAttempts() const { return _conf._failConnectAttempts; }
ConnectionBuilder& failConnectAttempts(unsigned f) {
_conf._failConnectAttempts = f;
return *this;
}
#endif

// Set the authentication type of the connection
AuthenticationType authenticationType() const {
return _conf._authenticationType;
Expand Down
2 changes: 2 additions & 0 deletions 3rdParty/fuerte/include/fuerte/loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@

#include <fuerte/asio_ns.h>

#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

// run / runWithWork / poll for Loop mapping to ioservice
// free function run with threads / with thread group barrier and work
Expand Down
6 changes: 6 additions & 0 deletions 3rdParty/fuerte/include/fuerte/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ struct ConnectionConfiguration {
_idleTimeout(300000),
_connectRetryPause(1000),
_maxConnectRetries(3),
#ifdef ARANGODB_USE_GOOGLE_TESTS
_failConnectAttempts(0),
#endif
_useIdleTimeout(true),
_authenticationType(AuthenticationType::None),
_user(""),
Expand All @@ -240,6 +243,9 @@ struct ConnectionConfiguration {
std::chrono::milliseconds _idleTimeout;
std::chrono::milliseconds _connectRetryPause;
unsigned _maxConnectRetries;
#ifdef ARANGODB_USE_GOOGLE_TESTS
unsigned _failConnectAttempts;
#endif
bool _useIdleTimeout;

AuthenticationType _authenticationType;
Expand Down
159 changes: 129 additions & 30 deletions 3rdParty/fuerte/src/AsioSockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,29 @@
namespace arangodb { namespace fuerte { inline namespace v1 {

namespace {
template <typename SocketT, typename F>
template <typename SocketT, typename F, typename IsAbortedCb>
void resolveConnect(detail::ConnectionConfiguration const& config,
asio_ns::ip::tcp::resolver& resolver, SocketT& socket,
F&& done) {
auto cb = [&socket, done(std::forward<F>(done))](auto ec, auto it) mutable {
F&& done, IsAbortedCb&& isAborted) {
auto cb = [&socket,
#ifdef ARANGODB_USE_GOOGLE_TESTS
fail = config._failConnectAttempts > 0,
#endif
done = std::forward<F>(done),
isAborted = std::forward<IsAbortedCb>(isAborted)](auto ec, auto it) mutable {
#ifdef ARANGODB_USE_GOOGLE_TESTS
if (fail) {
// use an error code != operation_aborted
ec = boost::system::errc::make_error_code(boost::system::errc::not_enough_memory);
}
#endif

if (isAborted()) {
ec = asio_ns::error::operation_aborted;
}

if (ec) { // error in address resolver
FUERTE_LOG_DEBUG << "received error during address resolving: " << ec.message() << "\n";
done(ec);
return;
}
Expand All @@ -44,7 +61,12 @@ void resolveConnect(detail::ConnectionConfiguration const& config,
// A successful resolve operation is guaranteed to pass a
// non-empty range to the handler.
asio_ns::async_connect(socket, it,
[done(std::move(done))](auto ec, auto it) mutable {
[done](auto ec, auto it) mutable {
if (ec) {
FUERTE_LOG_DEBUG << "executing async connect callback, error: " << ec.message() << "\n";
} else {
FUERTE_LOG_DEBUG << "executing async connect callback, no error\n";
}
std::forward<F>(done)(ec);
});
} catch (std::bad_alloc const&) {
Expand All @@ -63,7 +85,8 @@ void resolveConnect(detail::ConnectionConfiguration const& config,
auto it = resolver.resolve(config._host, config._port, ec);
cb(ec, it);
#else
// Resolve the host asynchronous into a series of endpoints
// Resolve the host asynchronously into a series of endpoints
FUERTE_LOG_DEBUG << "scheduled callback to resolve host " << config._host << ":" << config._port << "\n";
resolver.async_resolve(config._host, config._port, std::move(cb));
#endif
}
Expand All @@ -77,22 +100,44 @@ struct Socket<SocketType::Tcp> {
Socket(EventLoopService&, asio_ns::io_context& ctx)
: resolver(ctx), socket(ctx), timer(ctx) {}

~Socket() { this->cancel(); }
~Socket() {
try {
this->cancel();
} catch (std::exception const& ex) {
FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what() << "\n";
}
}

template <typename F>
void connect(detail::ConnectionConfiguration const& config, F&& done) {
resolveConnect(config, resolver, socket, std::forward<F>(done));
resolveConnect(config, resolver, socket, [this, done = std::forward<F>(done)](asio_ns::error_code ec) mutable {
FUERTE_LOG_DEBUG << "executing tcp connect callback, ec: " << ec.message() << ", canceled: " << this->canceled << "\n";
if (canceled) {
// cancel() was already called on this socket
FUERTE_ASSERT(socket.is_open() == false);
ec = asio_ns::error::operation_aborted;
}
done(ec);
}, [this]() {
return canceled;
});
}

void rearm() {
canceled = false;
}

void cancel() {
canceled = true;
try {
timer.cancel();
resolver.cancel();
if (socket.is_open()) { // non-graceful shutdown
asio_ns::error_code ec;
socket.close(ec);
}
} catch (...) {
} catch (std::exception const& ex) {
FUERTE_LOG_ERROR << "caught exception during tcp socket cancelation: " << ex.what() << "\n";
}
}

Expand All @@ -108,29 +153,46 @@ struct Socket<SocketType::Tcp> {
ec.clear();
socket.close(ec);
}
} catch (...) {
} catch (std::exception const& ex) {
// an exception is unlikely to occur here, as we are using the error-code
// variants of cancel/shutdown/close above
FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what() << "\n";
}
std::forward<F>(cb)(ec);
}

asio_ns::ip::tcp::resolver resolver;
asio_ns::ip::tcp::socket socket;
asio_ns::steady_timer timer;
bool canceled = false;
};

template <>
struct Socket<fuerte::SocketType::Ssl> {
Socket(EventLoopService& loop, asio_ns::io_context& ctx)
: resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), cleanupDone(false) {}
: resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), ctx(ctx),
sslContext(loop.sslContext()), cleanupDone(false) {}

~Socket() { this->cancel(); }
~Socket() {
try {
this->cancel();
} catch (std::exception const& ex) {
FUERTE_LOG_ERROR << "caught exception during ssl socket shutdown: " << ex.what() << "\n";
}
}

template <typename F>
void connect(detail::ConnectionConfiguration const& config, F&& done) {
bool verify = config._verifyHost;
resolveConnect(
config, resolver, socket.next_layer(),
[=, this, done(std::forward<F>(done))](auto const& ec) mutable {
[=, this](asio_ns::error_code ec) mutable {
FUERTE_LOG_DEBUG << "executing ssl connect callback, ec: " << ec.message() << ", canceled: " << this->canceled << "\n";
if (canceled) {
// cancel() was already called on this socket
FUERTE_ASSERT(socket.lowest_layer().is_open() == false);
ec = asio_ns::error::operation_aborted;
}
if (ec) {
done(ec);
return;
Expand Down Expand Up @@ -167,10 +229,19 @@ struct Socket<fuerte::SocketType::Ssl> {
}
socket.async_handshake(asio_ns::ssl::stream_base::client,
std::move(done));
}, [this]() {
return canceled;
});
}

void rearm() {
// create a new socket and declare it ready
socket = asio_ns::ssl::stream<asio_ns::ip::tcp::socket>(this->ctx, this->sslContext);
canceled = false;
}

void cancel() {
canceled = true;
try {
timer.cancel();
resolver.cancel();
Expand All @@ -180,7 +251,8 @@ struct Socket<fuerte::SocketType::Ssl> {
ec.clear();
socket.lowest_layer().close(ec);
}
} catch (...) {
} catch (std::exception const& ex) {
FUERTE_LOG_ERROR << "caught exception during ssl socket cancelation: " << ex.what() << "\n";
}
}

Expand All @@ -201,18 +273,7 @@ struct Socket<fuerte::SocketType::Ssl> {
return;
}
cleanupDone = false;
timer.expires_from_now(std::chrono::seconds(3));
timer.async_wait([cb, this](asio_ns::error_code ec) {
// Copy in callback such that the connection object is kept alive long
// enough, please do not delete, although it is not used here!
if (!cleanupDone && !ec) {
socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
ec.clear();
socket.lowest_layer().close(ec);
cleanupDone = true;
}
});
socket.async_shutdown([cb(std::forward<F>(cb)), this](auto const& ec) {
socket.async_shutdown([cb, this](auto const& ec) {
timer.cancel();
#ifndef _WIN32
if (!cleanupDone && (!ec || ec == asio_ns::error::basic_errors::not_connected)) {
Expand All @@ -225,32 +286,69 @@ struct Socket<fuerte::SocketType::Ssl> {
#endif
cb(ec);
});
timer.expires_from_now(std::chrono::seconds(3));
timer.async_wait([cb(std::forward<F>(cb)), this](asio_ns::error_code ec) {
// Copy in callback such that the connection object is kept alive long
// enough, please do not delete, although it is not used here!
if (!ec && !cleanupDone) {
socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
ec.clear();
socket.lowest_layer().close(ec);
cleanupDone = true;
}
});
}

asio_ns::ip::tcp::resolver resolver;
asio_ns::ssl::stream<asio_ns::ip::tcp::socket> socket;
asio_ns::steady_timer timer;
asio_ns::io_context& ctx;
asio_ns::ssl::context& sslContext;
std::atomic<bool> cleanupDone;
bool canceled = false;
};

#ifdef ASIO_HAS_LOCAL_SOCKETS
template <>
struct Socket<fuerte::SocketType::Unix> {
Socket(EventLoopService&, asio_ns::io_context& ctx)
: socket(ctx), timer(ctx) {}
~Socket() { this->cancel(); }

~Socket() {
canceled = true;
try {
this->cancel();
} catch (std::exception const& ex) {
FUERTE_LOG_ERROR << "caught exception during unix socket shutdown: " << ex.what() << "\n";
}
}

template <typename F>
void connect(detail::ConnectionConfiguration const& config, F&& done) {
if (canceled) {
// cancel() was already called on this socket
done(asio_ns::error::operation_aborted);
return;
}

asio_ns::local::stream_protocol::endpoint ep(config._host);
socket.async_connect(ep, std::forward<F>(done));
}

void rearm() {
canceled = false;
}

void cancel() {
timer.cancel();
if (socket.is_open()) { // non-graceful shutdown
asio_ns::error_code ec;
socket.close(ec);
canceled = true;
try {
timer.cancel();
if (socket.is_open()) { // non-graceful shutdown
asio_ns::error_code ec;
socket.close(ec);
}
} catch (std::exception const& ex) {
FUERTE_LOG_ERROR << "caught exception during unix socket cancelation: " << ex.what() << "\n";
}
}

Expand All @@ -268,6 +366,7 @@ struct Socket<fuerte::SocketType::Unix> {

asio_ns::local::stream_protocol::socket socket;
asio_ns::steady_timer timer;
bool canceled = false;
};
#endif // ASIO_HAS_LOCAL_SOCKETS

Expand Down
Loading
Loading