From 2e9a7360978c07b415379361f4b6ffe91b604867 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 30 Apr 2024 10:38:47 +0200 Subject: [PATCH 01/22] Fix SSL connection retry attempts for cluster-internal connections * Fix connection retry attempts for cluster-internal TLS connections that ran into the 15 seconds timeout during the connection establishing attempt. In this case, the low-level socket was repurposed, but not reset properly. This could leave the connection in an improper state and lead to callbacks for some requests to not being called as expected. --- 3rdParty/fuerte/include/fuerte/FuerteLogger.h | 12 +- 3rdParty/fuerte/include/fuerte/connection.h | 8 ++ 3rdParty/fuerte/include/fuerte/loop.h | 2 + 3rdParty/fuerte/include/fuerte/types.h | 6 + 3rdParty/fuerte/src/AsioSockets.h | 136 ++++++++++++++---- 3rdParty/fuerte/src/GeneralConnection.h | 106 +++++++++----- 3rdParty/fuerte/src/H1Connection.cpp | 10 +- 3rdParty/fuerte/src/H2Connection.cpp | 2 +- CHANGELOG | 6 + lib/Logger/LoggerFeature.cpp | 4 +- tests/Fuerte/CMakeLists.txt | 1 + tests/Fuerte/ConnectionFailuresTest.cpp | 56 +++++--- 12 files changed, 251 insertions(+), 98 deletions(-) diff --git a/3rdParty/fuerte/include/fuerte/FuerteLogger.h b/3rdParty/fuerte/include/fuerte/FuerteLogger.h index 84c217c5aeb1..59dad2cb7567 100644 --- a/3rdParty/fuerte/include/fuerte/FuerteLogger.h +++ b/3rdParty/fuerte/include/fuerte/FuerteLogger.h @@ -26,14 +26,15 @@ #if 0 #include #include +#include -extern void LogHackWriter(char const* p); +extern void LogHackWriter(std::string_view p); class LogHack { std::stringstream _s; public: LogHack() {}; - ~LogHack() { LogHackWriter(_s.str().c_str()); }; + ~LogHack() { LogHackWriter(_s.str()); }; template LogHack& operator<<(T const& o) { _s << o; return *this; } typedef std::basic_ostream > CoutType; typedef CoutType& (*StandardEndLine)(CoutType&); @@ -115,11 +116,4 @@ class LogHack { if (0) std::cout #endif -#if ENABLE_FUERTE_LOG_NODE > 0 -#define FUERTE_LOG_NODE std::cout -#else -#define FUERTE_LOG_NODE \ - if (0) std::cout -#endif - #endif diff --git a/3rdParty/fuerte/include/fuerte/connection.h b/3rdParty/fuerte/include/fuerte/connection.h index 0411436fdb57..d6e9345a4a56 100644 --- a/3rdParty/fuerte/include/fuerte/connection.h +++ b/3rdParty/fuerte/include/fuerte/connection.h @@ -175,6 +175,14 @@ class ConnectionBuilder { return *this; } +#ifdef ARANGODB_USE_GOOGLE_TESTS + unsigned failConnectAttempts() const { return _conf._failConnectAttempts; } + ConnectionBuilder& failConnectAttempts(unsigned f) { + _conf._failConnectAttempts = f; + return *this; + } +#endif + // Set the authentication type of the connection AuthenticationType authenticationType() const { return _conf._authenticationType; diff --git a/3rdParty/fuerte/include/fuerte/loop.h b/3rdParty/fuerte/include/fuerte/loop.h index 07277ea28e7c..c0b7c6d29abf 100644 --- a/3rdParty/fuerte/include/fuerte/loop.h +++ b/3rdParty/fuerte/include/fuerte/loop.h @@ -27,9 +27,11 @@ #include +#include #include #include #include +#include // run / runWithWork / poll for Loop mapping to ioservice // free function run with threads / with thread group barrier and work diff --git a/3rdParty/fuerte/include/fuerte/types.h b/3rdParty/fuerte/include/fuerte/types.h index ea647889fe7b..d94056e36bab 100644 --- a/3rdParty/fuerte/include/fuerte/types.h +++ b/3rdParty/fuerte/include/fuerte/types.h @@ -220,6 +220,9 @@ struct ConnectionConfiguration { _idleTimeout(300000), _connectRetryPause(1000), _maxConnectRetries(3), +#ifdef ARANGODB_USE_GOOGLE_TESTS + _failConnectAttempts(0), +#endif _useIdleTimeout(true), _authenticationType(AuthenticationType::None), _user(""), @@ -240,6 +243,9 @@ struct ConnectionConfiguration { std::chrono::milliseconds _idleTimeout; std::chrono::milliseconds _connectRetryPause; unsigned _maxConnectRetries; +#ifdef ARANGODB_USE_GOOGLE_TESTS + unsigned _failConnectAttempts; +#endif bool _useIdleTimeout; AuthenticationType _authenticationType; diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index e3cda967bd71..3b9cc1d0d44e 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -34,8 +34,16 @@ template void resolveConnect(detail::ConnectionConfiguration const& config, asio_ns::ip::tcp::resolver& resolver, SocketT& socket, F&& done) { - auto cb = [&socket, done(std::forward(done))](auto ec, auto it) mutable { + auto cb = [&socket, done = std::forward(done), fail = config._failConnectAttempts > 0](auto ec, auto it) mutable { +#ifdef ARANGODB_USE_GOOGLE_TESTS + if (fail) { + // use an error code != operation_aborted + ec = boost::system::errc::make_error_code(boost::system::errc::not_enough_memory); + } +#endif + if (ec) { // error in address resolver + FUERTE_LOG_DEBUG << "received error during address resolving: " << ec.message(); done(ec); return; } @@ -44,7 +52,12 @@ void resolveConnect(detail::ConnectionConfiguration const& config, // A successful resolve operation is guaranteed to pass a // non-empty range to the handler. asio_ns::async_connect(socket, it, - [done(std::move(done))](auto ec, auto it) mutable { + [done](auto ec, auto it) mutable { + if (ec) { + FUERTE_LOG_DEBUG << "executing async connect callback, error: " << ec.message(); + } else { + FUERTE_LOG_DEBUG << "executing async connect callback, no error"; + } std::forward(done)(ec); }); } catch (std::bad_alloc const&) { @@ -63,7 +76,8 @@ void resolveConnect(detail::ConnectionConfiguration const& config, auto it = resolver.resolve(config._host, config._port, ec); cb(ec, it); #else - // Resolve the host asynchronous into a series of endpoints + // Resolve the host asynchronously into a series of endpoints + FUERTE_LOG_DEBUG << "scheduled callback to resolve host " << config._host << ":" << config._port; resolver.async_resolve(config._host, config._port, std::move(cb)); #endif } @@ -77,14 +91,32 @@ struct Socket { Socket(EventLoopService&, asio_ns::io_context& ctx) : resolver(ctx), socket(ctx), timer(ctx) {} - ~Socket() { this->cancel(); } + ~Socket() { + try { + this->cancel(); + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what(); + } + } template void connect(detail::ConnectionConfiguration const& config, F&& done) { - resolveConnect(config, resolver, socket, std::forward(done)); + resolveConnect(config, resolver, socket, [this, done = std::forward(done)](asio_ns::error_code ec) mutable { + FUERTE_LOG_DEBUG << "executing tcp connect callback, ec: " << ec.message() << ", canceled: " << this->canceled; + if (canceled) { + // cancel() was already called on this socket + ec = asio_ns::error::operation_aborted; + } + done(ec); + }); + } + + void rearm() { + canceled = false; } void cancel() { + canceled = true; try { timer.cancel(); resolver.cancel(); @@ -92,7 +124,8 @@ struct Socket { asio_ns::error_code ec; socket.close(ec); } - } catch (...) { + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during tcp socket cancelation: " << ex.what(); } } @@ -108,7 +141,10 @@ struct Socket { ec.clear(); socket.close(ec); } - } catch (...) { + } catch (std::exception const& ex) { + // an exception is unlikely to occur here, as we are using the error-code + // variants of cancel/shutdown/close above + FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what(); } std::forward(cb)(ec); } @@ -116,21 +152,33 @@ struct Socket { asio_ns::ip::tcp::resolver resolver; asio_ns::ip::tcp::socket socket; asio_ns::steady_timer timer; + bool canceled = false; }; template <> struct Socket { Socket(EventLoopService& loop, asio_ns::io_context& ctx) - : resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), cleanupDone(false) {} + : resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), ctx(ctx), sslContext(loop.sslContext()), cleanupDone(false) {} - ~Socket() { this->cancel(); } + ~Socket() { + try { + this->cancel(); + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during ssl socket shutdown: " << ex.what(); + } + } template void connect(detail::ConnectionConfiguration const& config, F&& done) { bool verify = config._verifyHost; resolveConnect( config, resolver, socket.next_layer(), - [=, this, done(std::forward(done))](auto const& ec) mutable { + [=, this](asio_ns::error_code ec) mutable { + FUERTE_LOG_DEBUG << "executing ssl connect callback, ec: " << ec.message() << ", canceled: " << this->canceled; + if (canceled) { + // cancel() was already called on this socket + ec = asio_ns::error::operation_aborted; + } if (ec) { done(ec); return; @@ -169,8 +217,14 @@ struct Socket { std::move(done)); }); } + + void rearm() { + socket = asio_ns::ssl::stream(this->ctx, this->sslContext); + canceled = false; + } void cancel() { + canceled = true; try { timer.cancel(); resolver.cancel(); @@ -180,7 +234,8 @@ struct Socket { ec.clear(); socket.lowest_layer().close(ec); } - } catch (...) { + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during ssl socket cancelation: " << ex.what(); } } @@ -201,17 +256,6 @@ struct Socket { return; } cleanupDone = false; - timer.expires_from_now(std::chrono::seconds(3)); - timer.async_wait([cb, this](asio_ns::error_code ec) { - // Copy in callback such that the connection object is kept alive long - // enough, please do not delete, although it is not used here! - if (!cleanupDone && !ec) { - socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); - ec.clear(); - socket.lowest_layer().close(ec); - cleanupDone = true; - } - }); socket.async_shutdown([cb(std::forward(cb)), this](auto const& ec) { timer.cancel(); #ifndef _WIN32 @@ -225,12 +269,26 @@ struct Socket { #endif cb(ec); }); + timer.expires_from_now(std::chrono::seconds(3)); + timer.async_wait([cb, this](asio_ns::error_code ec) { + // Copy in callback such that the connection object is kept alive long + // enough, please do not delete, although it is not used here! + if (!cleanupDone && !ec) { + socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); + ec.clear(); + socket.lowest_layer().close(ec); + cleanupDone = true; + } + }); } asio_ns::ip::tcp::resolver resolver; asio_ns::ssl::stream socket; asio_ns::steady_timer timer; + asio_ns::io_context& ctx; + asio_ns::ssl::context& sslContext; std::atomic cleanupDone; + bool canceled = false; }; #ifdef ASIO_HAS_LOCAL_SOCKETS @@ -238,19 +296,42 @@ template <> struct Socket { Socket(EventLoopService&, asio_ns::io_context& ctx) : socket(ctx), timer(ctx) {} - ~Socket() { this->cancel(); } + + ~Socket() { + canceled = true; + try { + this->cancel(); + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during unix socket shutdown: " << ex.what(); + } + } template void connect(detail::ConnectionConfiguration const& config, F&& done) { + if (canceled) { + // cancel() was already called on this socket + done(asio_ns::error::operation_aborted); + return; + } + asio_ns::local::stream_protocol::endpoint ep(config._host); socket.async_connect(ep, std::forward(done)); } + + void rearm() { + canceled = false; + } void cancel() { - timer.cancel(); - if (socket.is_open()) { // non-graceful shutdown - asio_ns::error_code ec; - socket.close(ec); + canceled = true; + try { + timer.cancel(); + if (socket.is_open()) { // non-graceful shutdown + asio_ns::error_code ec; + socket.close(ec); + } + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during unix socket cancelation: " << ex.what(); } } @@ -268,6 +349,7 @@ struct Socket { asio_ns::local::stream_protocol::socket socket; asio_ns::steady_timer timer; + bool canceled = false; }; #endif // ASIO_HAS_LOCAL_SOCKETS diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index a8efe29bab73..68a2efd7be37 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -74,6 +74,10 @@ #include "AsioSockets.h" #include "debugging.h" +#include +#include +#include + namespace arangodb { namespace fuerte { using Clock = std::chrono::steady_clock; @@ -86,7 +90,11 @@ class GeneralConnection : public fuerte::Connection { detail::ConnectionConfiguration const& config) : Connection(config), _io_context(loop.nextIOContext()), - _proto(loop, *_io_context) {} + _proto(loop, *_io_context) { +#ifdef ARANGODB_USE_GOOGLE_TESTS + _failConnectAttempts = config._failConnectAttempts; +#endif + } virtual ~GeneralConnection() noexcept { _state.store(Connection::State::Closed); @@ -168,8 +176,7 @@ class GeneralConnection : public fuerte::Connection { if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) { FUERTE_LOG_DEBUG << "startConnection: this=" << this << "\n"; FUERTE_ASSERT(_config._maxConnectRetries > 0); - tryConnect(_config._maxConnectRetries, Clock::now(), - asio_ns::error_code()); + tryConnect(_config._maxConnectRetries); } else { FUERTE_LOG_DEBUG << "startConnection: this=" << this << " found unexpected state " << static_cast(exp) @@ -196,7 +203,7 @@ class GeneralConnection : public fuerte::Connection { abortRequests(err, /*now*/ Clock::time_point::max()); - _proto.shutdown([=, this, self(shared_from_this())](auto const& ec) { + _proto.shutdown([=, this, self = shared_from_this()](asio_ns::error_code ec) { terminateActivity(err); onFailure(err, msg); }); // Close socket @@ -262,7 +269,7 @@ class GeneralConnection : public fuerte::Connection { /// The following is called when the connection is permanently failed. It is /// used to shut down any activity in the derived classes in a way that avoids /// sleeping barbers - void terminateActivity(const fuerte::Error err) { + void terminateActivity(fuerte::Error err) { // Usually, we are `active == true` when we get here, except for the // following case: If we are inactive but the connection is still open and // then the idle timeout goes off, then we shutdownConnection and in the TLS @@ -281,6 +288,14 @@ class GeneralConnection : public fuerte::Connection { } } + void cancelTimer() noexcept { + try { + this->_proto.timer.cancel(); + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during timer cancelation: " << ex.what(); + } + } + protected: virtual void finishConnect() = 0; @@ -300,57 +315,73 @@ class GeneralConnection : public fuerte::Connection { private: // Connect with a given number of retries - void tryConnect(unsigned retries, Clock::time_point start, - asio_ns::error_code const& ec) { - if (_state.load() != Connection::State::Connecting) { - return; - } + void tryConnect(unsigned retries) { + FUERTE_ASSERT(retries > 0); - if (retries == 0) { - std::string msg("connecting failed: '"); - msg.append((ec != asio_ns::error::operation_aborted) ? ec.message() - : "timeout"); - msg.push_back('\''); - shutdownConnection(Error::CouldNotConnect, msg); + if (_state.load() != Connection::State::Connecting) { return; } FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; auto self = Connection::shared_from_this(); - _proto.timer.expires_at(start + _config._connectTimeout); - _proto.timer.async_wait([self](asio_ns::error_code const& ec) { - if (!ec && self->state() == Connection::State::Connecting) { - // the connect handler below gets 'operation_aborted' error - static_cast&>(*self)._proto.cancel(); - } - }); - - _proto.connect(_config, [self, start, retries](auto const& ec) mutable { + _proto.connect(_config, [self, retries](asio_ns::error_code ec) mutable { auto& me = static_cast&>(*self); - me._proto.timer.cancel(); + me.cancelTimer(); // Note that is is possible that the alarm has already gone off, in which // case its closure might already be queued right after ourselves! // However, we now quickly set the state to `Connected` in which case the // closure will no longer shut down the socket and ruin our success. if (!ec) { + FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") established connection this=" << self.get() << "\n"; me.finishConnect(); return; } - FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n"; - if (retries > 0 && ec != asio_ns::error::operation_aborted) { - auto end = std::min(Clock::now() + me._config._connectRetryPause, - start + me._config._connectTimeout); +#ifdef ARANGODB_USE_GOOGLE_TESTS + if (me._failConnectAttempts > 0) { + --me._failConnectAttempts; + } +#endif + + FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), connecting failed: " << ec.message() << "\n"; + if (retries > 1 && ec != asio_ns::error::operation_aborted) { + FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), scheduling retry operation. this=" << self.get() << "\n"; + auto end = Clock::now() + me._config._connectRetryPause; me._proto.timer.expires_at(end); me._proto.timer.async_wait( - [self(std::move(self)), start, retries](auto ec) mutable { + [self = std::move(self), retries](asio_ns::error_code ec) mutable { + if (ec) { + FUERTE_LOG_DEBUG << "tryConnect, retry timer canceled. this=" << self.get() << "\n"; + // timer canceled. + return; + } auto& me = static_cast&>(*self); - me.tryConnect(!ec ? retries - 1 : 0, start, ec); + // rearm socket so that we can use it again + FUERTE_LOG_DEBUG << "tryConnect, rearming connection this=" << self.get() << "\n"; + me._proto.rearm(); + me.tryConnect(retries - 1); }); } else { - me.tryConnect(0, start, ec); // <- handles errors + std::string msg("connecting failed: "); + msg.append((ec != asio_ns::error::operation_aborted) ? ec.message() + : "timeout"); + FUERTE_LOG_DEBUG << "tryConnect, calling shutdownConnection(2): " << msg << " this=" << self.get(); + me.shutdownConnection(Error::CouldNotConnect, msg); } }); + + auto start = Clock::now(); + _proto.timer.expires_at(start + _config._connectTimeout); + _proto.timer.async_wait([self = std::move(self)](asio_ns::error_code ec) { + if (!ec && self->state() == Connection::State::Connecting) { + // note: if the timer fires successfully, ec is empty here. + // the connect handler below gets 'operation_aborted' error + auto& me = static_cast&>(*self); + me._proto.cancel(); + FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get(); + } + }); + } protected: @@ -373,6 +404,12 @@ class GeneralConnection : public fuerte::Connection { std::atomic _active{false}; +#ifdef ARANGODB_USE_GOOGLE_TESTS + // if this member is > 0, then this many connection attempts will fail + // in this connection + unsigned _failConnectAttempts = 0; +#endif + bool _reading = false; // set to true while an async_read is ongoing bool _writing = false; // set between starting an asyncWrite operation and // executing the completion handler}; @@ -425,8 +462,7 @@ struct MultiConnection : public GeneralConnection { void setTimeout(bool setIOBegin) { const bool wasIdle = _streams.empty(); if (wasIdle && !this->_config._useIdleTimeout) { - asio_ns::error_code ec; - this->_proto.timer.cancel(ec); + this->cancelTimer(); return; } diff --git a/3rdParty/fuerte/src/H1Connection.cpp b/3rdParty/fuerte/src/H1Connection.cpp index 15ef4ab9f136..ab542095a316 100644 --- a/3rdParty/fuerte/src/H1Connection.cpp +++ b/3rdParty/fuerte/src/H1Connection.cpp @@ -363,7 +363,7 @@ void H1Connection::asyncWriteCallback(asio_ns::error_code const& ec, FUERTE_ASSERT(this->_state == Connection::State::Connected || this->_state == Connection::State::Closed); this->_writing = false; // indicate that no async write is ongoing any more - this->_proto.timer.cancel(); // cancel alarm for timeout + this->cancelTimer(); // cancel alarm for timeout auto const now = Clock::now(); if (ec || _item == nullptr || _item->expires < now) { @@ -403,7 +403,7 @@ template void H1Connection::asyncReadCallback(asio_ns::error_code const& ec) { // Do not cancel timeout now, because we might be going on to read! if (_item == nullptr) { // could happen on aborts - this->_proto.timer.cancel(); + this->cancelTimer(); this->shutdownConnection(Error::CloseRequested); return; } @@ -443,7 +443,8 @@ void H1Connection::asyncReadCallback(asio_ns::error_code const& ec) { FUERTE_ASSERT(_response != nullptr); _messageComplete = false; // prevent entering branch on EOF - this->_proto.timer.cancel(); // got response in time + this->cancelTimer(); // got response in time + if (!_responseBuffer.empty()) { _response->setPayload(std::move(_responseBuffer), 0); } @@ -502,8 +503,7 @@ template void H1Connection::setIOTimeout() { const bool isIdle = _item == nullptr; if (isIdle && !this->_config._useIdleTimeout) { - asio_ns::error_code ec; - this->_proto.timer.cancel(ec); + this->cancelTimer(); return; } diff --git a/3rdParty/fuerte/src/H2Connection.cpp b/3rdParty/fuerte/src/H2Connection.cpp index 30911511ac45..10fb761f7eff 100644 --- a/3rdParty/fuerte/src/H2Connection.cpp +++ b/3rdParty/fuerte/src/H2Connection.cpp @@ -350,7 +350,7 @@ void H2Connection::readSwitchingProtocolsResponse() { this->_proto.socket, this->_receiveBuffer, "\r\n\r\n", [self](asio_ns::error_code const& ec, size_t nread) { auto& me = static_cast&>(*self); - me._proto.timer.cancel(); + me.cancelTimer(); if (ec) { me.shutdownConnection(Error::ReadError, "error reading upgrade response"); diff --git a/CHANGELOG b/CHANGELOG index 62d08d57a6f3..90f82f71c789 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,12 @@ v3.11.9 (XXXX-XX-XX) -------------------- +* Fix connection retry attempts for cluster-internal TLS connections that ran + into the 15 seconds timeout during the connection establishing attempt. + In this case, the low-level socket was repurposed, but not reset properly. + This could leave the connection in an improper state and lead to callbacks + for some requests to not being called as expected. + * Updated ArangoDB Starter to v0.18.5. * Detach threads in getResponsibleServers if they wait for more than 1s. diff --git a/lib/Logger/LoggerFeature.cpp b/lib/Logger/LoggerFeature.cpp index 27e353e662a8..a03df065032f 100644 --- a/lib/Logger/LoggerFeature.cpp +++ b/lib/Logger/LoggerFeature.cpp @@ -60,9 +60,7 @@ using namespace arangodb::options; // Please leave this code in for the next time we have to debug fuerte. #if 0 -void LogHackWriter(char const* p) { - LOG_DEVEL << p; -} +void LogHackWriter(std::string_view msg) { LOG_DEVEL << msg; } #endif namespace arangodb { diff --git a/tests/Fuerte/CMakeLists.txt b/tests/Fuerte/CMakeLists.txt index 15d1aa548d72..802ec59990bc 100644 --- a/tests/Fuerte/CMakeLists.txt +++ b/tests/Fuerte/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable(fuertetest ) target_link_libraries(fuertetest + arango fuerte gtest boost_boost diff --git a/tests/Fuerte/ConnectionFailuresTest.cpp b/tests/Fuerte/ConnectionFailuresTest.cpp index 4896f07d0508..688b13816e73 100644 --- a/tests/Fuerte/ConnectionFailuresTest.cpp +++ b/tests/Fuerte/ConnectionFailuresTest.cpp @@ -26,21 +26,42 @@ #include "gtest/gtest.h" +#include + namespace f = ::arangodb::fuerte; +// for testing connection failures, we need a free port that is not used by +// another service. because we can't be sure about high port numbers, we are +// using some from the very range. according to +// https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml?&page=2 +// port 60 is unassigned and thus likely not taken by any service on the test +// host. previously we used port 8629 for testing connection failures, but this +// was flawed because it was possible that some service was running on port +// 8629, which then made the connection failure tests fail. +constexpr std::string_view urls[] = { + "http://localhost:60", "h2://localhost:60", "vst://localhost:60", + "ssl://localhost:60", "h2s://localhost:60", +}; + // tryToConnectExpectFailure tries to make a connection to a host with given // url. This is expected to fail. static void tryToConnectExpectFailure(f::EventLoopService& eventLoopService, - const std::string& url) { + std::string_view url, bool useRetries) { f::WaitGroup wg; wg.add(); f::ConnectionBuilder cbuilder; cbuilder.connectTimeout(std::chrono::milliseconds(250)); cbuilder.connectRetryPause(std::chrono::milliseconds(100)); - cbuilder.endpoint(url); +#ifdef ARANGODB_USE_GOOGLE_TESTS + if (useRetries) { + cbuilder.failConnectAttempts(2); + } + cbuilder.maxConnectRetries(3); +#endif + cbuilder.endpoint(std::string{url}); - cbuilder.onFailure([&](f::Error errorCode, const std::string& errorMessage) { + cbuilder.onFailure([&](f::Error errorCode, std::string const& errorMessage) { ASSERT_EQ(errorCode, f::Error::CouldNotConnect); wg.done(); }); @@ -59,29 +80,28 @@ static void tryToConnectExpectFailure(f::EventLoopService& eventLoopService, // that cannot be resolved. TEST(ConnectionFailureTest, CannotResolveHttp) { f::EventLoopService loop; - tryToConnectExpectFailure(loop, - "http://thishostmustnotexist.arangodb.com:8529"); + tryToConnectExpectFailure( + loop, "http://thishostmustnotexist.arangodb.com:8529", false); } TEST(ConnectionFailureTest, CannotResolveVst) { f::EventLoopService loop; - tryToConnectExpectFailure(loop, - "vst://thishostmustnotexist.arangodb.com:8529"); + tryToConnectExpectFailure( + loop, "vst://thishostmustnotexist.arangodb.com:8529", false); } // CannotConnect tests try to make a connection to a host with a valid name // but a wrong port. -TEST(ConnectionFailureTest, CannotConnectHttp) { - f::EventLoopService loop; - tryToConnectExpectFailure(loop, "http://localhost:8629"); +TEST(ConnectionFailureTest, CannotConnect) { + for (auto const& url : urls) { + f::EventLoopService loop; + tryToConnectExpectFailure(loop, url, /*useRetries*/ false); + } } -TEST(ConnectionFailureTest, CannotConnectHttp2) { - f::EventLoopService loop; - tryToConnectExpectFailure(loop, "h2://localhost:8629"); -} - -TEST(ConnectionFailureTest, CannotConnectVst) { - f::EventLoopService loop; - tryToConnectExpectFailure(loop, "vst://localhost:8629"); +TEST(ConnectionFailureTest, CannotConnectForceRetries) { + for (auto const& url : urls) { + f::EventLoopService loop; + tryToConnectExpectFailure(loop, url, /*useRetries*/ true); + } } From a219cf687286e6487edf6ad14b0b15369a673ed0 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 30 Apr 2024 10:51:02 +0200 Subject: [PATCH 02/22] apply some review comments --- 3rdParty/fuerte/src/AsioSockets.h | 8 +++++++- 3rdParty/fuerte/src/GeneralConnection.h | 6 ++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index 3b9cc1d0d44e..a8ac0078103a 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -34,7 +34,11 @@ template void resolveConnect(detail::ConnectionConfiguration const& config, asio_ns::ip::tcp::resolver& resolver, SocketT& socket, F&& done) { - auto cb = [&socket, done = std::forward(done), fail = config._failConnectAttempts > 0](auto ec, auto it) mutable { + auto cb = [&socket, +#ifdef ARANGODB_USE_GOOGLE_TESTS + fail = config._failConnectAttempts > 0, +#endif + done = std::forward(done)](auto ec, auto it) mutable { #ifdef ARANGODB_USE_GOOGLE_TESTS if (fail) { // use an error code != operation_aborted @@ -105,6 +109,7 @@ struct Socket { FUERTE_LOG_DEBUG << "executing tcp connect callback, ec: " << ec.message() << ", canceled: " << this->canceled; if (canceled) { // cancel() was already called on this socket + FUERTE_ASSERT(socket.is_open() == false); ec = asio_ns::error::operation_aborted; } done(ec); @@ -177,6 +182,7 @@ struct Socket { FUERTE_LOG_DEBUG << "executing ssl connect callback, ec: " << ec.message() << ", canceled: " << this->canceled; if (canceled) { // cancel() was already called on this socket + FUERTE_ASSERT(socket.lowest_layer().is_open() == false); ec = asio_ns::error::operation_aborted; } if (ec) { diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index 68a2efd7be37..31ed49d508c4 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -346,8 +346,7 @@ class GeneralConnection : public fuerte::Connection { FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), connecting failed: " << ec.message() << "\n"; if (retries > 1 && ec != asio_ns::error::operation_aborted) { FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), scheduling retry operation. this=" << self.get() << "\n"; - auto end = Clock::now() + me._config._connectRetryPause; - me._proto.timer.expires_at(end); + me._proto.timer.expires_after(me._config._connectRetryPause); me._proto.timer.async_wait( [self = std::move(self), retries](asio_ns::error_code ec) mutable { if (ec) { @@ -370,8 +369,7 @@ class GeneralConnection : public fuerte::Connection { } }); - auto start = Clock::now(); - _proto.timer.expires_at(start + _config._connectTimeout); + _proto.timer.expires_after(_config._connectTimeout); _proto.timer.async_wait([self = std::move(self)](asio_ns::error_code ec) { if (!ec && self->state() == Connection::State::Connecting) { // note: if the timer fires successfully, ec is empty here. From e3f7279fc2c1e87e1f73b6465556285790384847 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 30 Apr 2024 11:11:11 +0200 Subject: [PATCH 03/22] check if connection was aborted --- 3rdParty/fuerte/src/AsioSockets.h | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index a8ac0078103a..e141f776e0ef 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -30,15 +30,16 @@ namespace arangodb { namespace fuerte { inline namespace v1 { namespace { -template +template void resolveConnect(detail::ConnectionConfiguration const& config, asio_ns::ip::tcp::resolver& resolver, SocketT& socket, - F&& done) { + F&& done, IsAbortedCb&& isAborted) { auto cb = [&socket, #ifdef ARANGODB_USE_GOOGLE_TESTS fail = config._failConnectAttempts > 0, #endif - done = std::forward(done)](auto ec, auto it) mutable { + done = std::forward(done), + isAborted = std::forward(isAborted)](auto ec, auto it) mutable { #ifdef ARANGODB_USE_GOOGLE_TESTS if (fail) { // use an error code != operation_aborted @@ -46,6 +47,10 @@ void resolveConnect(detail::ConnectionConfiguration const& config, } #endif + if (isAborted()) { + ec = asio_ns::error::operation_aborted; + } + if (ec) { // error in address resolver FUERTE_LOG_DEBUG << "received error during address resolving: " << ec.message(); done(ec); @@ -113,6 +118,8 @@ struct Socket { ec = asio_ns::error::operation_aborted; } done(ec); + }, [this]() { + return canceled; }); } @@ -181,9 +188,9 @@ struct Socket { [=, this](asio_ns::error_code ec) mutable { FUERTE_LOG_DEBUG << "executing ssl connect callback, ec: " << ec.message() << ", canceled: " << this->canceled; if (canceled) { - // cancel() was already called on this socket - FUERTE_ASSERT(socket.lowest_layer().is_open() == false); - ec = asio_ns::error::operation_aborted; + // cancel() was already called on this socket + FUERTE_ASSERT(socket.lowest_layer().is_open() == false); + ec = asio_ns::error::operation_aborted; } if (ec) { done(ec); @@ -221,6 +228,8 @@ struct Socket { } socket.async_handshake(asio_ns::ssl::stream_base::client, std::move(done)); + }, [this]() { + return canceled; }); } From 71e2b5e0e80cccaf6f5e641b5240830cdd57ed4c Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 30 Apr 2024 11:34:29 +0200 Subject: [PATCH 04/22] revert change to cmakelists for tests --- tests/Fuerte/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/Fuerte/CMakeLists.txt b/tests/Fuerte/CMakeLists.txt index 802ec59990bc..15d1aa548d72 100644 --- a/tests/Fuerte/CMakeLists.txt +++ b/tests/Fuerte/CMakeLists.txt @@ -9,7 +9,6 @@ add_executable(fuertetest ) target_link_libraries(fuertetest - arango fuerte gtest boost_boost From b5b50ef49630e2366ecf21d9678167f61a119436 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Thu, 2 May 2024 17:57:48 +0200 Subject: [PATCH 05/22] disable socket rearming for now --- 3rdParty/fuerte/src/AsioSockets.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index e141f776e0ef..f2eb83329dcc 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -234,7 +234,7 @@ struct Socket { } void rearm() { - socket = asio_ns::ssl::stream(this->ctx, this->sslContext); + // socket = asio_ns::ssl::stream(this->ctx, this->sslContext); canceled = false; } From 4b506ffb0493b2d5667aab1dc623b04329f00867 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Fri, 3 May 2024 11:29:48 +0200 Subject: [PATCH 06/22] simplify PR --- 3rdParty/fuerte/src/AsioSockets.h | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index e141f776e0ef..00b52a71525c 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -170,7 +170,7 @@ struct Socket { template <> struct Socket { Socket(EventLoopService& loop, asio_ns::io_context& ctx) - : resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), ctx(ctx), sslContext(loop.sslContext()), cleanupDone(false) {} + : resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), cleanupDone(false) {} ~Socket() { try { @@ -234,7 +234,6 @@ struct Socket { } void rearm() { - socket = asio_ns::ssl::stream(this->ctx, this->sslContext); canceled = false; } @@ -300,8 +299,6 @@ struct Socket { asio_ns::ip::tcp::resolver resolver; asio_ns::ssl::stream socket; asio_ns::steady_timer timer; - asio_ns::io_context& ctx; - asio_ns::ssl::context& sslContext; std::atomic cleanupDone; bool canceled = false; }; From 11928cddbb58c921d703df7b0e06019afabafb1b Mon Sep 17 00:00:00 2001 From: jsteemann Date: Fri, 3 May 2024 12:35:21 +0200 Subject: [PATCH 07/22] remove retry entirely --- 3rdParty/fuerte/include/fuerte/connection.h | 26 --------- 3rdParty/fuerte/include/fuerte/types.h | 12 +--- 3rdParty/fuerte/src/AsioSockets.h | 22 -------- 3rdParty/fuerte/src/GeneralConnection.h | 61 ++++----------------- CHANGELOG | 2 + tests/Fuerte/ConnectionFailuresTest.cpp | 26 ++------- 6 files changed, 21 insertions(+), 128 deletions(-) diff --git a/3rdParty/fuerte/include/fuerte/connection.h b/3rdParty/fuerte/include/fuerte/connection.h index d6e9345a4a56..c0271a7d8e30 100644 --- a/3rdParty/fuerte/include/fuerte/connection.h +++ b/3rdParty/fuerte/include/fuerte/connection.h @@ -157,32 +157,6 @@ class ConnectionBuilder { return *this; } - /// @brief connect retry pause (1s default) - std::chrono::milliseconds connectRetryPause() const { - return _conf._connectRetryPause; - } - /// @brief set the connect retry pause (1s default) - ConnectionBuilder& connectRetryPause(std::chrono::milliseconds p) { - _conf._connectRetryPause = p; - return *this; - } - - /// @brief connect retries (3 default) - unsigned maxConnectRetries() const { return _conf._maxConnectRetries; } - /// @brief set the max connect retries (3 default) - ConnectionBuilder& maxConnectRetries(unsigned r) { - _conf._maxConnectRetries = r; - return *this; - } - -#ifdef ARANGODB_USE_GOOGLE_TESTS - unsigned failConnectAttempts() const { return _conf._failConnectAttempts; } - ConnectionBuilder& failConnectAttempts(unsigned f) { - _conf._failConnectAttempts = f; - return *this; - } -#endif - // Set the authentication type of the connection AuthenticationType authenticationType() const { return _conf._authenticationType; diff --git a/3rdParty/fuerte/include/fuerte/types.h b/3rdParty/fuerte/include/fuerte/types.h index d94056e36bab..5da4cc441984 100644 --- a/3rdParty/fuerte/include/fuerte/types.h +++ b/3rdParty/fuerte/include/fuerte/types.h @@ -216,13 +216,8 @@ struct ConnectionConfiguration { _host("localhost"), _port("8529"), _verifyHost(false), - _connectTimeout(15000), + _connectTimeout(60000), _idleTimeout(300000), - _connectRetryPause(1000), - _maxConnectRetries(3), -#ifdef ARANGODB_USE_GOOGLE_TESTS - _failConnectAttempts(0), -#endif _useIdleTimeout(true), _authenticationType(AuthenticationType::None), _user(""), @@ -241,11 +236,6 @@ struct ConnectionConfiguration { std::chrono::milliseconds _connectTimeout; std::chrono::milliseconds _idleTimeout; - std::chrono::milliseconds _connectRetryPause; - unsigned _maxConnectRetries; -#ifdef ARANGODB_USE_GOOGLE_TESTS - unsigned _failConnectAttempts; -#endif bool _useIdleTimeout; AuthenticationType _authenticationType; diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index 00b52a71525c..48711cc701fa 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -35,18 +35,8 @@ void resolveConnect(detail::ConnectionConfiguration const& config, asio_ns::ip::tcp::resolver& resolver, SocketT& socket, F&& done, IsAbortedCb&& isAborted) { auto cb = [&socket, -#ifdef ARANGODB_USE_GOOGLE_TESTS - fail = config._failConnectAttempts > 0, -#endif done = std::forward(done), isAborted = std::forward(isAborted)](auto ec, auto it) mutable { -#ifdef ARANGODB_USE_GOOGLE_TESTS - if (fail) { - // use an error code != operation_aborted - ec = boost::system::errc::make_error_code(boost::system::errc::not_enough_memory); - } -#endif - if (isAborted()) { ec = asio_ns::error::operation_aborted; } @@ -123,10 +113,6 @@ struct Socket { }); } - void rearm() { - canceled = false; - } - void cancel() { canceled = true; try { @@ -233,10 +219,6 @@ struct Socket { }); } - void rearm() { - canceled = false; - } - void cancel() { canceled = true; try { @@ -330,10 +312,6 @@ struct Socket { socket.async_connect(ep, std::forward(done)); } - void rearm() { - canceled = false; - } - void cancel() { canceled = true; try { diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index 31ed49d508c4..2f18b77a8ad6 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -90,11 +90,7 @@ class GeneralConnection : public fuerte::Connection { detail::ConnectionConfiguration const& config) : Connection(config), _io_context(loop.nextIOContext()), - _proto(loop, *_io_context) { -#ifdef ARANGODB_USE_GOOGLE_TESTS - _failConnectAttempts = config._failConnectAttempts; -#endif - } + _proto(loop, *_io_context) {} virtual ~GeneralConnection() noexcept { _state.store(Connection::State::Closed); @@ -175,8 +171,7 @@ class GeneralConnection : public fuerte::Connection { Connection::State exp = Connection::State::Created; if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) { FUERTE_LOG_DEBUG << "startConnection: this=" << this << "\n"; - FUERTE_ASSERT(_config._maxConnectRetries > 0); - tryConnect(_config._maxConnectRetries); + tryConnect(); } else { FUERTE_LOG_DEBUG << "startConnection: this=" << this << " found unexpected state " << static_cast(exp) @@ -314,18 +309,16 @@ class GeneralConnection : public fuerte::Connection { RequestCallback&& cb) = 0; private: - // Connect with a given number of retries - void tryConnect(unsigned retries) { - FUERTE_ASSERT(retries > 0); - + // Try to connect + void tryConnect() { if (_state.load() != Connection::State::Connecting) { return; } - FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; + FUERTE_LOG_DEBUG << "tryConnect this=" << this << "\n"; auto self = Connection::shared_from_this(); - _proto.connect(_config, [self, retries](asio_ns::error_code ec) mutable { + _proto.connect(_config, [self](asio_ns::error_code ec) mutable { auto& me = static_cast&>(*self); me.cancelTimer(); // Note that is is possible that the alarm has already gone off, in which @@ -333,40 +326,16 @@ class GeneralConnection : public fuerte::Connection { // However, we now quickly set the state to `Connected` in which case the // closure will no longer shut down the socket and ruin our success. if (!ec) { - FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") established connection this=" << self.get() << "\n"; + FUERTE_LOG_DEBUG << "tryConnect established connection this=" << self.get() << "\n"; me.finishConnect(); return; } -#ifdef ARANGODB_USE_GOOGLE_TESTS - if (me._failConnectAttempts > 0) { - --me._failConnectAttempts; - } -#endif - - FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), connecting failed: " << ec.message() << "\n"; - if (retries > 1 && ec != asio_ns::error::operation_aborted) { - FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), scheduling retry operation. this=" << self.get() << "\n"; - me._proto.timer.expires_after(me._config._connectRetryPause); - me._proto.timer.async_wait( - [self = std::move(self), retries](asio_ns::error_code ec) mutable { - if (ec) { - FUERTE_LOG_DEBUG << "tryConnect, retry timer canceled. this=" << self.get() << "\n"; - // timer canceled. - return; - } - auto& me = static_cast&>(*self); - // rearm socket so that we can use it again - FUERTE_LOG_DEBUG << "tryConnect, rearming connection this=" << self.get() << "\n"; - me._proto.rearm(); - me.tryConnect(retries - 1); - }); - } else { - std::string msg("connecting failed: "); - msg.append((ec != asio_ns::error::operation_aborted) ? ec.message() + + std::string msg("connecting failed: "); + msg.append((ec != asio_ns::error::operation_aborted) ? ec.message() : "timeout"); - FUERTE_LOG_DEBUG << "tryConnect, calling shutdownConnection(2): " << msg << " this=" << self.get(); - me.shutdownConnection(Error::CouldNotConnect, msg); - } + FUERTE_LOG_DEBUG << "tryConnect, calling shutdownConnection: " << msg << " this=" << self.get(); + me.shutdownConnection(Error::CouldNotConnect, msg); }); _proto.timer.expires_after(_config._connectTimeout); @@ -402,12 +371,6 @@ class GeneralConnection : public fuerte::Connection { std::atomic _active{false}; -#ifdef ARANGODB_USE_GOOGLE_TESTS - // if this member is > 0, then this many connection attempts will fail - // in this connection - unsigned _failConnectAttempts = 0; -#endif - bool _reading = false; // set to true while an async_read is ongoing bool _writing = false; // set between starting an asyncWrite operation and // executing the completion handler}; diff --git a/CHANGELOG b/CHANGELOG index 7de710c09efd..056e150a11e4 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -6,6 +6,8 @@ v3.11.9 (XXXX-XX-XX) In this case, the low-level socket was repurposed, but not reset properly. This could leave the connection in an improper state and lead to callbacks for some requests to not being called as expected. + The fix is to remove the retry logic, and instead prolong the connection + timeout to a multiple of the previous timeout value. * Prioritize requests for commiting or aborting streaming transactions on leaders and followers, because they can unblock other operations. diff --git a/tests/Fuerte/ConnectionFailuresTest.cpp b/tests/Fuerte/ConnectionFailuresTest.cpp index 688b13816e73..bd464afb2cbe 100644 --- a/tests/Fuerte/ConnectionFailuresTest.cpp +++ b/tests/Fuerte/ConnectionFailuresTest.cpp @@ -46,19 +46,12 @@ constexpr std::string_view urls[] = { // tryToConnectExpectFailure tries to make a connection to a host with given // url. This is expected to fail. static void tryToConnectExpectFailure(f::EventLoopService& eventLoopService, - std::string_view url, bool useRetries) { + std::string_view url) { f::WaitGroup wg; wg.add(); f::ConnectionBuilder cbuilder; cbuilder.connectTimeout(std::chrono::milliseconds(250)); - cbuilder.connectRetryPause(std::chrono::milliseconds(100)); -#ifdef ARANGODB_USE_GOOGLE_TESTS - if (useRetries) { - cbuilder.failConnectAttempts(2); - } - cbuilder.maxConnectRetries(3); -#endif cbuilder.endpoint(std::string{url}); cbuilder.onFailure([&](f::Error errorCode, std::string const& errorMessage) { @@ -80,14 +73,14 @@ static void tryToConnectExpectFailure(f::EventLoopService& eventLoopService, // that cannot be resolved. TEST(ConnectionFailureTest, CannotResolveHttp) { f::EventLoopService loop; - tryToConnectExpectFailure( - loop, "http://thishostmustnotexist.arangodb.com:8529", false); + tryToConnectExpectFailure(loop, + "http://thishostmustnotexist.arangodb.com:8529"); } TEST(ConnectionFailureTest, CannotResolveVst) { f::EventLoopService loop; - tryToConnectExpectFailure( - loop, "vst://thishostmustnotexist.arangodb.com:8529", false); + tryToConnectExpectFailure(loop, + "vst://thishostmustnotexist.arangodb.com:8529"); } // CannotConnect tests try to make a connection to a host with a valid name @@ -95,13 +88,6 @@ TEST(ConnectionFailureTest, CannotResolveVst) { TEST(ConnectionFailureTest, CannotConnect) { for (auto const& url : urls) { f::EventLoopService loop; - tryToConnectExpectFailure(loop, url, /*useRetries*/ false); - } -} - -TEST(ConnectionFailureTest, CannotConnectForceRetries) { - for (auto const& url : urls) { - f::EventLoopService loop; - tryToConnectExpectFailure(loop, url, /*useRetries*/ true); + tryToConnectExpectFailure(loop, url); } } From 0b5c67ce013654f2c90eb71ee95727a621c950ed Mon Sep 17 00:00:00 2001 From: jsteemann Date: Fri, 3 May 2024 13:14:38 +0200 Subject: [PATCH 08/22] fix arangosh build --- client-tools/Shell/V8ClientConnection.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/client-tools/Shell/V8ClientConnection.cpp b/client-tools/Shell/V8ClientConnection.cpp index 89476baeb8da..557bf1049226 100644 --- a/client-tools/Shell/V8ClientConnection.cpp +++ b/client-tools/Shell/V8ClientConnection.cpp @@ -107,8 +107,6 @@ V8ClientConnection::V8ClientConnection(ArangoshServer& server, _vpackOptions.buildUnindexedObjects = true; _vpackOptions.buildUnindexedArrays = true; - _builder.maxConnectRetries(3); - _builder.connectRetryPause(std::chrono::milliseconds(100)); _builder.connectTimeout(std::chrono::milliseconds( static_cast(1000.0 * _client.connectionTimeout()))); _builder.onFailure([this](fu::Error err, std::string const& msg) { From 43bf56b641b7baa9e5b977ac4cc040d95291d6e9 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Mon, 6 May 2024 19:46:05 +0200 Subject: [PATCH 09/22] further fixes for socket shutdown --- 3rdParty/fuerte/include/fuerte/connection.h | 26 +++++++ 3rdParty/fuerte/include/fuerte/types.h | 12 ++- 3rdParty/fuerte/src/AsioSockets.h | 67 +++++++++++++---- 3rdParty/fuerte/src/GeneralConnection.h | 81 +++++++++++++++------ CHANGELOG | 2 - client-tools/Shell/V8ClientConnection.cpp | 2 + tests/Fuerte/ConnectionFailuresTest.cpp | 28 +++++-- 7 files changed, 171 insertions(+), 47 deletions(-) diff --git a/3rdParty/fuerte/include/fuerte/connection.h b/3rdParty/fuerte/include/fuerte/connection.h index c0271a7d8e30..d6e9345a4a56 100644 --- a/3rdParty/fuerte/include/fuerte/connection.h +++ b/3rdParty/fuerte/include/fuerte/connection.h @@ -157,6 +157,32 @@ class ConnectionBuilder { return *this; } + /// @brief connect retry pause (1s default) + std::chrono::milliseconds connectRetryPause() const { + return _conf._connectRetryPause; + } + /// @brief set the connect retry pause (1s default) + ConnectionBuilder& connectRetryPause(std::chrono::milliseconds p) { + _conf._connectRetryPause = p; + return *this; + } + + /// @brief connect retries (3 default) + unsigned maxConnectRetries() const { return _conf._maxConnectRetries; } + /// @brief set the max connect retries (3 default) + ConnectionBuilder& maxConnectRetries(unsigned r) { + _conf._maxConnectRetries = r; + return *this; + } + +#ifdef ARANGODB_USE_GOOGLE_TESTS + unsigned failConnectAttempts() const { return _conf._failConnectAttempts; } + ConnectionBuilder& failConnectAttempts(unsigned f) { + _conf._failConnectAttempts = f; + return *this; + } +#endif + // Set the authentication type of the connection AuthenticationType authenticationType() const { return _conf._authenticationType; diff --git a/3rdParty/fuerte/include/fuerte/types.h b/3rdParty/fuerte/include/fuerte/types.h index 5da4cc441984..daf4fe785d5d 100644 --- a/3rdParty/fuerte/include/fuerte/types.h +++ b/3rdParty/fuerte/include/fuerte/types.h @@ -216,8 +216,13 @@ struct ConnectionConfiguration { _host("localhost"), _port("8529"), _verifyHost(false), - _connectTimeout(60000), + _connectTimeout(15000), _idleTimeout(300000), + _connectRetryPause(1000), + _maxConnectRetries(3), +#ifdef ARANGODB_USE_GOOGLE_TESTS + _failConnectAttempts(0), +#endif _useIdleTimeout(true), _authenticationType(AuthenticationType::None), _user(""), @@ -236,6 +241,11 @@ struct ConnectionConfiguration { std::chrono::milliseconds _connectTimeout; std::chrono::milliseconds _idleTimeout; + std::chrono::milliseconds _connectRetryPause; + unsigned _maxConnectRetries; +#ifdef ARANGODB_USE_GOOGLE_TESTS + unsigned _failConnectAttempts; +#endif bool _useIdleTimeout; AuthenticationType _authenticationType; diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index 48711cc701fa..a46435b2b205 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -27,6 +27,8 @@ #include #include "debugging.h" +#include + namespace arangodb { namespace fuerte { inline namespace v1 { namespace { @@ -35,14 +37,24 @@ void resolveConnect(detail::ConnectionConfiguration const& config, asio_ns::ip::tcp::resolver& resolver, SocketT& socket, F&& done, IsAbortedCb&& isAborted) { auto cb = [&socket, +#ifdef ARANGODB_USE_GOOGLE_TESTS + fail = config._failConnectAttempts > 0, +#endif done = std::forward(done), isAborted = std::forward(isAborted)](auto ec, auto it) mutable { +#ifdef ARANGODB_USE_GOOGLE_TESTS + if (fail) { + // use an error code != operation_aborted + ec = boost::system::errc::make_error_code(boost::system::errc::not_enough_memory); + } +#endif + if (isAborted()) { ec = asio_ns::error::operation_aborted; } if (ec) { // error in address resolver - FUERTE_LOG_DEBUG << "received error during address resolving: " << ec.message(); + FUERTE_LOG_DEBUG << "received error during address resolving: " << ec.message() << "\n"; done(ec); return; } @@ -53,9 +65,9 @@ void resolveConnect(detail::ConnectionConfiguration const& config, asio_ns::async_connect(socket, it, [done](auto ec, auto it) mutable { if (ec) { - FUERTE_LOG_DEBUG << "executing async connect callback, error: " << ec.message(); + FUERTE_LOG_DEBUG << "executing async connect callback, error: " << ec.message() << "\n"; } else { - FUERTE_LOG_DEBUG << "executing async connect callback, no error"; + FUERTE_LOG_DEBUG << "executing async connect callback, no error\n"; } std::forward(done)(ec); }); @@ -76,7 +88,7 @@ void resolveConnect(detail::ConnectionConfiguration const& config, cb(ec, it); #else // Resolve the host asynchronously into a series of endpoints - FUERTE_LOG_DEBUG << "scheduled callback to resolve host " << config._host << ":" << config._port; + FUERTE_LOG_DEBUG << "scheduled callback to resolve host " << config._host << ":" << config._port << "\n"; resolver.async_resolve(config._host, config._port, std::move(cb)); #endif } @@ -94,14 +106,14 @@ struct Socket { try { this->cancel(); } catch (std::exception const& ex) { - FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what(); + FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what() << "\n"; } } template void connect(detail::ConnectionConfiguration const& config, F&& done) { resolveConnect(config, resolver, socket, [this, done = std::forward(done)](asio_ns::error_code ec) mutable { - FUERTE_LOG_DEBUG << "executing tcp connect callback, ec: " << ec.message() << ", canceled: " << this->canceled; + FUERTE_LOG_DEBUG << "executing tcp connect callback, ec: " << ec.message() << ", canceled: " << this->canceled << "\n"; if (canceled) { // cancel() was already called on this socket FUERTE_ASSERT(socket.is_open() == false); @@ -113,6 +125,10 @@ struct Socket { }); } + void rearm() { + canceled = false; + } + void cancel() { canceled = true; try { @@ -123,7 +139,7 @@ struct Socket { socket.close(ec); } } catch (std::exception const& ex) { - FUERTE_LOG_ERROR << "caught exception during tcp socket cancelation: " << ex.what(); + FUERTE_LOG_ERROR << "caught exception during tcp socket cancelation: " << ex.what() << "\n"; } } @@ -142,7 +158,7 @@ struct Socket { } catch (std::exception const& ex) { // an exception is unlikely to occur here, as we are using the error-code // variants of cancel/shutdown/close above - FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what(); + FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what() << "\n"; } std::forward(cb)(ec); } @@ -156,13 +172,17 @@ struct Socket { template <> struct Socket { Socket(EventLoopService& loop, asio_ns::io_context& ctx) - : resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), cleanupDone(false) {} + : resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), ctx(ctx), + sslContext(loop.sslContext()), cleanupDone(false) { + // at least 3 retries + deadSockets.reserve(3); + } ~Socket() { try { this->cancel(); } catch (std::exception const& ex) { - FUERTE_LOG_ERROR << "caught exception during ssl socket shutdown: " << ex.what(); + FUERTE_LOG_ERROR << "caught exception during ssl socket shutdown: " << ex.what() << "\n"; } } @@ -172,7 +192,7 @@ struct Socket { resolveConnect( config, resolver, socket.next_layer(), [=, this](asio_ns::error_code ec) mutable { - FUERTE_LOG_DEBUG << "executing ssl connect callback, ec: " << ec.message() << ", canceled: " << this->canceled; + FUERTE_LOG_DEBUG << "executing ssl connect callback, ec: " << ec.message() << ", canceled: " << this->canceled << "\n"; if (canceled) { // cancel() was already called on this socket FUERTE_ASSERT(socket.lowest_layer().is_open() == false); @@ -218,6 +238,16 @@ struct Socket { return canceled; }); } + + void rearm() { + // move away old socket, in case it is still in use by some background operation + FUERTE_ASSERT(deadSockets.capacity() > deadSockets.size()); + deadSockets.push_back(std::move(socket)); + + // create a new socket instead and declare it ready + socket = asio_ns::ssl::stream(this->ctx, this->sslContext); + canceled = false; + } void cancel() { canceled = true; @@ -231,7 +261,7 @@ struct Socket { socket.lowest_layer().close(ec); } } catch (std::exception const& ex) { - FUERTE_LOG_ERROR << "caught exception during ssl socket cancelation: " << ex.what(); + FUERTE_LOG_ERROR << "caught exception during ssl socket cancelation: " << ex.what() << "\n"; } } @@ -269,7 +299,7 @@ struct Socket { timer.async_wait([cb, this](asio_ns::error_code ec) { // Copy in callback such that the connection object is kept alive long // enough, please do not delete, although it is not used here! - if (!cleanupDone && !ec) { + if (!ec && !cleanupDone) { socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); ec.clear(); socket.lowest_layer().close(ec); @@ -281,6 +311,9 @@ struct Socket { asio_ns::ip::tcp::resolver resolver; asio_ns::ssl::stream socket; asio_ns::steady_timer timer; + asio_ns::io_context& ctx; + asio_ns::ssl::context& sslContext; + std::vector> deadSockets; std::atomic cleanupDone; bool canceled = false; }; @@ -296,7 +329,7 @@ struct Socket { try { this->cancel(); } catch (std::exception const& ex) { - FUERTE_LOG_ERROR << "caught exception during unix socket shutdown: " << ex.what(); + FUERTE_LOG_ERROR << "caught exception during unix socket shutdown: " << ex.what() << "\n"; } } @@ -311,6 +344,10 @@ struct Socket { asio_ns::local::stream_protocol::endpoint ep(config._host); socket.async_connect(ep, std::forward(done)); } + + void rearm() { + canceled = false; + } void cancel() { canceled = true; @@ -321,7 +358,7 @@ struct Socket { socket.close(ec); } } catch (std::exception const& ex) { - FUERTE_LOG_ERROR << "caught exception during unix socket cancelation: " << ex.what(); + FUERTE_LOG_ERROR << "caught exception during unix socket cancelation: " << ex.what() << "\n"; } } diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index 2f18b77a8ad6..df5ed4153d78 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -90,7 +90,11 @@ class GeneralConnection : public fuerte::Connection { detail::ConnectionConfiguration const& config) : Connection(config), _io_context(loop.nextIOContext()), - _proto(loop, *_io_context) {} + _proto(loop, *_io_context) { +#ifdef ARANGODB_USE_GOOGLE_TESTS + _failConnectAttempts = config._failConnectAttempts; +#endif + } virtual ~GeneralConnection() noexcept { _state.store(Connection::State::Closed); @@ -154,10 +158,10 @@ class GeneralConnection : public fuerte::Connection { /// @brief cancel the connection, unusable afterwards void cancel() override { FUERTE_LOG_DEBUG << "cancel: this=" << this << "\n"; - asio_ns::post(*_io_context, [self(weak_from_this()), this] { + asio_ns::post(*_io_context, [self(weak_from_this())] { auto s = self.lock(); if (s) { - shutdownConnection(Error::ConnectionCanceled); + static_cast&>(*s).shutdownConnection(Error::ConnectionCanceled); } }); } @@ -171,11 +175,12 @@ class GeneralConnection : public fuerte::Connection { Connection::State exp = Connection::State::Created; if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) { FUERTE_LOG_DEBUG << "startConnection: this=" << this << "\n"; - tryConnect(); + FUERTE_ASSERT(_config._maxConnectRetries > 0); + tryConnect(_config._maxConnectRetries); } else { FUERTE_LOG_DEBUG << "startConnection: this=" << this << " found unexpected state " << static_cast(exp) - << " not equal to 'Created'"; + << " not equal to 'Created'\n"; FUERTE_ASSERT(false); } } @@ -198,9 +203,10 @@ class GeneralConnection : public fuerte::Connection { abortRequests(err, /*now*/ Clock::time_point::max()); - _proto.shutdown([=, this, self = shared_from_this()](asio_ns::error_code ec) { - terminateActivity(err); - onFailure(err, msg); + _proto.shutdown([=, self = shared_from_this()](asio_ns::error_code ec) { + auto& me = static_cast&>(*self); + me.terminateActivity(err); + me.onFailure(err, msg); }); // Close socket } @@ -287,7 +293,7 @@ class GeneralConnection : public fuerte::Connection { try { this->_proto.timer.cancel(); } catch (std::exception const& ex) { - FUERTE_LOG_ERROR << "caught exception during timer cancelation: " << ex.what(); + FUERTE_LOG_ERROR << "caught exception during timer cancelation: " << ex.what() << "\n"; } } @@ -309,16 +315,18 @@ class GeneralConnection : public fuerte::Connection { RequestCallback&& cb) = 0; private: - // Try to connect - void tryConnect() { + // Try to connect with a given number of retries + void tryConnect(unsigned retries) { + FUERTE_ASSERT(retries > 0); + if (_state.load() != Connection::State::Connecting) { return; } - FUERTE_LOG_DEBUG << "tryConnect this=" << this << "\n"; + FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; auto self = Connection::shared_from_this(); - _proto.connect(_config, [self](asio_ns::error_code ec) mutable { + _proto.connect(_config, [self, retries](asio_ns::error_code ec) mutable { auto& me = static_cast&>(*self); me.cancelTimer(); // Note that is is possible that the alarm has already gone off, in which @@ -326,16 +334,41 @@ class GeneralConnection : public fuerte::Connection { // However, we now quickly set the state to `Connected` in which case the // closure will no longer shut down the socket and ruin our success. if (!ec) { - FUERTE_LOG_DEBUG << "tryConnect established connection this=" << self.get() << "\n"; + FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") established connection this=" << self.get() << "\n"; me.finishConnect(); return; } - - std::string msg("connecting failed: "); - msg.append((ec != asio_ns::error::operation_aborted) ? ec.message() + +#ifdef ARANGODB_USE_GOOGLE_TESTS + if (me._failConnectAttempts > 0) { + --me._failConnectAttempts; + } +#endif + + FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), connecting failed: " << ec.message() << "\n"; + if (retries > 1 && ec != asio_ns::error::operation_aborted) { + FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), scheduling retry operation. this=" << self.get() << "\n"; + me._proto.timer.expires_after(me._config._connectRetryPause); + me._proto.timer.async_wait( + [self = std::move(self), retries](asio_ns::error_code ec) mutable { + if (ec) { + FUERTE_LOG_DEBUG << "tryConnect, retry timer canceled. this=" << self.get() << "\n"; + // timer canceled. + return; + } + auto& me = static_cast&>(*self); + // rearm socket so that we can use it again + FUERTE_LOG_DEBUG << "tryConnect, rearming connection this=" << self.get() << "\n"; + me._proto.rearm(); + me.tryConnect(retries - 1); + }); + } else { + std::string msg("connecting failed: "); + msg.append((ec != asio_ns::error::operation_aborted) ? ec.message() : "timeout"); - FUERTE_LOG_DEBUG << "tryConnect, calling shutdownConnection: " << msg << " this=" << self.get(); - me.shutdownConnection(Error::CouldNotConnect, msg); + FUERTE_LOG_DEBUG << "tryConnect, calling shutdownConnection: " << msg << " this=" << self.get() << "\n"; + me.shutdownConnection(Error::CouldNotConnect, msg); + } }); _proto.timer.expires_after(_config._connectTimeout); @@ -345,7 +378,7 @@ class GeneralConnection : public fuerte::Connection { // the connect handler below gets 'operation_aborted' error auto& me = static_cast&>(*self); me._proto.cancel(); - FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get(); + FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get() << "\n"; } }); @@ -371,6 +404,12 @@ class GeneralConnection : public fuerte::Connection { std::atomic _active{false}; +#ifdef ARANGODB_USE_GOOGLE_TESTS + // if this member is > 0, then this many connection attempts will fail + // in this connection + unsigned _failConnectAttempts = 0; +#endif + bool _reading = false; // set to true while an async_read is ongoing bool _writing = false; // set between starting an asyncWrite operation and // executing the completion handler}; @@ -453,7 +492,7 @@ struct MultiConnection : public GeneralConnection { // expires_after cancels pending ops this->_proto.timer.expires_at(tp); this->_proto.timer.async_wait( - [=, this, self = Connection::weak_from_this()](auto const& ec) { + [=, self = Connection::weak_from_this()](auto const& ec) { std::shared_ptr s; if (ec || !(s = self.lock())) { // was canceled / deallocated return; diff --git a/CHANGELOG b/CHANGELOG index 921f59dd3f23..3bcd9d7b4242 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -6,8 +6,6 @@ v3.11.9 (XXXX-XX-XX) In this case, the low-level socket was repurposed, but not reset properly. This could leave the connection in an improper state and lead to callbacks for some requests to not being called as expected. - The fix is to remove the retry logic, and instead prolong the connection - timeout to a multiple of the previous timeout value. * Improved the time required to create a new Collection in a database with hundreds of collections. This also improves times for indexes and dropping diff --git a/client-tools/Shell/V8ClientConnection.cpp b/client-tools/Shell/V8ClientConnection.cpp index 557bf1049226..89476baeb8da 100644 --- a/client-tools/Shell/V8ClientConnection.cpp +++ b/client-tools/Shell/V8ClientConnection.cpp @@ -107,6 +107,8 @@ V8ClientConnection::V8ClientConnection(ArangoshServer& server, _vpackOptions.buildUnindexedObjects = true; _vpackOptions.buildUnindexedArrays = true; + _builder.maxConnectRetries(3); + _builder.connectRetryPause(std::chrono::milliseconds(100)); _builder.connectTimeout(std::chrono::milliseconds( static_cast(1000.0 * _client.connectionTimeout()))); _builder.onFailure([this](fu::Error err, std::string const& msg) { diff --git a/tests/Fuerte/ConnectionFailuresTest.cpp b/tests/Fuerte/ConnectionFailuresTest.cpp index bd464afb2cbe..02a12e3ac7e9 100644 --- a/tests/Fuerte/ConnectionFailuresTest.cpp +++ b/tests/Fuerte/ConnectionFailuresTest.cpp @@ -26,8 +26,6 @@ #include "gtest/gtest.h" -#include - namespace f = ::arangodb::fuerte; // for testing connection failures, we need a free port that is not used by @@ -46,12 +44,19 @@ constexpr std::string_view urls[] = { // tryToConnectExpectFailure tries to make a connection to a host with given // url. This is expected to fail. static void tryToConnectExpectFailure(f::EventLoopService& eventLoopService, - std::string_view url) { + std::string_view url, bool useRetries) { f::WaitGroup wg; wg.add(); f::ConnectionBuilder cbuilder; cbuilder.connectTimeout(std::chrono::milliseconds(250)); + cbuilder.connectRetryPause(std::chrono::milliseconds(100)); +#ifdef ARANGODB_USE_GOOGLE_TESTS + if (useRetries) { + cbuilder.failConnectAttempts(2); + } + cbuilder.maxConnectRetries(3); +#endif cbuilder.endpoint(std::string{url}); cbuilder.onFailure([&](f::Error errorCode, std::string const& errorMessage) { @@ -73,14 +78,14 @@ static void tryToConnectExpectFailure(f::EventLoopService& eventLoopService, // that cannot be resolved. TEST(ConnectionFailureTest, CannotResolveHttp) { f::EventLoopService loop; - tryToConnectExpectFailure(loop, - "http://thishostmustnotexist.arangodb.com:8529"); + tryToConnectExpectFailure( + loop, "http://thishostmustnotexist.arangodb.com:8529", false); } TEST(ConnectionFailureTest, CannotResolveVst) { f::EventLoopService loop; - tryToConnectExpectFailure(loop, - "vst://thishostmustnotexist.arangodb.com:8529"); + tryToConnectExpectFailure( + loop, "vst://thishostmustnotexist.arangodb.com:8529", false); } // CannotConnect tests try to make a connection to a host with a valid name @@ -88,6 +93,13 @@ TEST(ConnectionFailureTest, CannotResolveVst) { TEST(ConnectionFailureTest, CannotConnect) { for (auto const& url : urls) { f::EventLoopService loop; - tryToConnectExpectFailure(loop, url); + tryToConnectExpectFailure(loop, url, /*useRetries*/ false); + } +} + +TEST(ConnectionFailureTest, CannotConnectForceRetries) { + for (auto const& url : urls) { + f::EventLoopService loop; + tryToConnectExpectFailure(loop, url, /*useRetries*/ true); } } From 8f7911b3272e69c31525a0d2293341dff32f967f Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 7 May 2024 10:28:58 +0200 Subject: [PATCH 10/22] fix ownership issue --- 3rdParty/fuerte/src/AsioSockets.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index a46435b2b205..f41396671778 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -282,7 +282,7 @@ struct Socket { return; } cleanupDone = false; - socket.async_shutdown([cb(std::forward(cb)), this](auto const& ec) { + socket.async_shutdown([cb, this](auto const& ec) { timer.cancel(); #ifndef _WIN32 if (!cleanupDone && (!ec || ec == asio_ns::error::basic_errors::not_connected)) { @@ -296,7 +296,7 @@ struct Socket { cb(ec); }); timer.expires_from_now(std::chrono::seconds(3)); - timer.async_wait([cb, this](asio_ns::error_code ec) { + timer.async_wait([cb(std::forward(cb)), this](asio_ns::error_code ec) { // Copy in callback such that the connection object is kept alive long // enough, please do not delete, although it is not used here! if (!ec && !cleanupDone) { From 0bc3207a8e386d63a7f354416b7d0044fcf29a8a Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 7 May 2024 10:36:53 +0200 Subject: [PATCH 11/22] fix compiler warning --- 3rdParty/fuerte/src/GeneralConnection.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index df5ed4153d78..c7c68bb87f1e 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -492,7 +492,7 @@ struct MultiConnection : public GeneralConnection { // expires_after cancels pending ops this->_proto.timer.expires_at(tp); this->_proto.timer.async_wait( - [=, self = Connection::weak_from_this()](auto const& ec) { + [=, this, self = Connection::weak_from_this()](auto const& ec) { std::shared_ptr s; if (ec || !(s = self.lock())) { // was canceled / deallocated return; From a2706fdbce3d68b8887a1005a05a6b973bf7e540 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 7 May 2024 11:38:51 +0200 Subject: [PATCH 12/22] simplify PR --- 3rdParty/fuerte/src/AsioSockets.h | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index f41396671778..5d269ab1b75e 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -27,8 +27,6 @@ #include #include "debugging.h" -#include - namespace arangodb { namespace fuerte { inline namespace v1 { namespace { @@ -173,10 +171,7 @@ template <> struct Socket { Socket(EventLoopService& loop, asio_ns::io_context& ctx) : resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), ctx(ctx), - sslContext(loop.sslContext()), cleanupDone(false) { - // at least 3 retries - deadSockets.reserve(3); - } + sslContext(loop.sslContext()), cleanupDone(false) {} ~Socket() { try { @@ -240,11 +235,7 @@ struct Socket { } void rearm() { - // move away old socket, in case it is still in use by some background operation - FUERTE_ASSERT(deadSockets.capacity() > deadSockets.size()); - deadSockets.push_back(std::move(socket)); - - // create a new socket instead and declare it ready + // create a new socket and declare it ready socket = asio_ns::ssl::stream(this->ctx, this->sslContext); canceled = false; } @@ -313,7 +304,6 @@ struct Socket { asio_ns::steady_timer timer; asio_ns::io_context& ctx; asio_ns::ssl::context& sslContext; - std::vector> deadSockets; std::atomic cleanupDone; bool canceled = false; }; From 7039fa6bbc3c6e0e08b8d76f54de1906f0261793 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 7 May 2024 17:22:41 +0200 Subject: [PATCH 13/22] reset timer earlier --- 3rdParty/fuerte/src/AsioSockets.h | 2 +- 3rdParty/fuerte/src/GeneralConnection.h | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index 5d269ab1b75e..34d5cd74ccdb 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -273,6 +273,7 @@ struct Socket { return; } cleanupDone = false; + timer.expires_from_now(std::chrono::seconds(3)); socket.async_shutdown([cb, this](auto const& ec) { timer.cancel(); #ifndef _WIN32 @@ -286,7 +287,6 @@ struct Socket { #endif cb(ec); }); - timer.expires_from_now(std::chrono::seconds(3)); timer.async_wait([cb(std::forward(cb)), this](asio_ns::error_code ec) { // Copy in callback such that the connection object is kept alive long // enough, please do not delete, although it is not used here! diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index c7c68bb87f1e..105a7954c10e 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -325,6 +325,8 @@ class GeneralConnection : public fuerte::Connection { FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; auto self = Connection::shared_from_this(); + + _proto.timer.expires_after(_config._connectTimeout); _proto.connect(_config, [self, retries](asio_ns::error_code ec) mutable { auto& me = static_cast&>(*self); @@ -371,7 +373,6 @@ class GeneralConnection : public fuerte::Connection { } }); - _proto.timer.expires_after(_config._connectTimeout); _proto.timer.async_wait([self = std::move(self)](asio_ns::error_code ec) { if (!ec && self->state() == Connection::State::Connecting) { // note: if the timer fires successfully, ec is empty here. From 09817661297f1f8fa3f4b45581c127f47a894933 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 7 May 2024 18:26:32 +0200 Subject: [PATCH 14/22] make sure callback is always called --- 3rdParty/fuerte/src/AsioSockets.h | 8 ++++++++ 3rdParty/fuerte/src/GeneralConnection.h | 14 +++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index 34d5cd74ccdb..0b94ce7404f4 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -92,6 +92,11 @@ void resolveConnect(detail::ConnectionConfiguration const& config, } } // namespace +enum class ConnectTimerRole { + kConnect = 1, + kReconnect = 2, +}; + template struct Socket {}; @@ -164,6 +169,7 @@ struct Socket { asio_ns::ip::tcp::resolver resolver; asio_ns::ip::tcp::socket socket; asio_ns::steady_timer timer; + ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; bool canceled = false; }; @@ -305,6 +311,7 @@ struct Socket { asio_ns::io_context& ctx; asio_ns::ssl::context& sslContext; std::atomic cleanupDone; + ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; bool canceled = false; }; @@ -366,6 +373,7 @@ struct Socket { asio_ns::local::stream_protocol::socket socket; asio_ns::steady_timer timer; + ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; bool canceled = false; }; #endif // ASIO_HAS_LOCAL_SOCKETS diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index 105a7954c10e..1db4dd83409c 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -325,7 +325,8 @@ class GeneralConnection : public fuerte::Connection { FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; auto self = Connection::shared_from_this(); - + + _proto.connectTimerRole = ConnectTimerRole::kConnect; _proto.timer.expires_after(_config._connectTimeout); _proto.connect(_config, [self, retries](asio_ns::error_code ec) mutable { @@ -350,6 +351,7 @@ class GeneralConnection : public fuerte::Connection { FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), connecting failed: " << ec.message() << "\n"; if (retries > 1 && ec != asio_ns::error::operation_aborted) { FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), scheduling retry operation. this=" << self.get() << "\n"; + me._proto.connectTimerRole = ConnectTimerRole::kReconnect; me._proto.timer.expires_after(me._config._connectRetryPause); me._proto.timer.async_wait( [self = std::move(self), retries](asio_ns::error_code ec) mutable { @@ -378,8 +380,14 @@ class GeneralConnection : public fuerte::Connection { // note: if the timer fires successfully, ec is empty here. // the connect handler below gets 'operation_aborted' error auto& me = static_cast&>(*self); - me._proto.cancel(); - FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get() << "\n"; + if (me._proto.connectTimerRole == ConnectTimerRole::kReconnect) { + std::string msg("connecting failed: timeout"); + FUERTE_LOG_DEBUG << "tryConnect timer, calling shutdownConnection: " << msg << " this=" << self.get() << "\n"; + me.shutdownConnection(Error::CouldNotConnect, msg); + } else { + FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get() << "\n"; + me._proto.cancel(); + } } }); From a3381d3238e91fe67753568cba5058dd55f3a133 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 7 May 2024 19:11:55 +0200 Subject: [PATCH 15/22] fix timer cancelation race --- 3rdParty/fuerte/src/AsioSockets.h | 8 -------- 3rdParty/fuerte/src/GeneralConnection.h | 23 ++++++++++------------- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index 0b94ce7404f4..34d5cd74ccdb 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -92,11 +92,6 @@ void resolveConnect(detail::ConnectionConfiguration const& config, } } // namespace -enum class ConnectTimerRole { - kConnect = 1, - kReconnect = 2, -}; - template struct Socket {}; @@ -169,7 +164,6 @@ struct Socket { asio_ns::ip::tcp::resolver resolver; asio_ns::ip::tcp::socket socket; asio_ns::steady_timer timer; - ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; bool canceled = false; }; @@ -311,7 +305,6 @@ struct Socket { asio_ns::io_context& ctx; asio_ns::ssl::context& sslContext; std::atomic cleanupDone; - ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; bool canceled = false; }; @@ -373,7 +366,6 @@ struct Socket { asio_ns::local::stream_protocol::socket socket; asio_ns::steady_timer timer; - ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; bool canceled = false; }; #endif // ASIO_HAS_LOCAL_SOCKETS diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index 1db4dd83409c..7ad2db6202d6 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -325,8 +325,7 @@ class GeneralConnection : public fuerte::Connection { FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; auto self = Connection::shared_from_this(); - - _proto.connectTimerRole = ConnectTimerRole::kConnect; + _proto.timer.expires_after(_config._connectTimeout); _proto.connect(_config, [self, retries](asio_ns::error_code ec) mutable { @@ -351,16 +350,20 @@ class GeneralConnection : public fuerte::Connection { FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), connecting failed: " << ec.message() << "\n"; if (retries > 1 && ec != asio_ns::error::operation_aborted) { FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), scheduling retry operation. this=" << self.get() << "\n"; - me._proto.connectTimerRole = ConnectTimerRole::kReconnect; me._proto.timer.expires_after(me._config._connectRetryPause); me._proto.timer.async_wait( [self = std::move(self), retries](asio_ns::error_code ec) mutable { + auto& me = static_cast&>(*self); if (ec) { - FUERTE_LOG_DEBUG << "tryConnect, retry timer canceled. this=" << self.get() << "\n"; // timer canceled. + FUERTE_LOG_DEBUG << "tryConnect, retry timer canceled. this=" << self.get() << "\n"; + // this can happen when the timeout callback was already queued - in this + // case the connect callback will not be called with operation_aborted, + // so we must shutdown here + me.shutdownConnection(Error::CouldNotConnect, + "connecting failed: timeout"); return; } - auto& me = static_cast&>(*self); // rearm socket so that we can use it again FUERTE_LOG_DEBUG << "tryConnect, rearming connection this=" << self.get() << "\n"; me._proto.rearm(); @@ -380,14 +383,8 @@ class GeneralConnection : public fuerte::Connection { // note: if the timer fires successfully, ec is empty here. // the connect handler below gets 'operation_aborted' error auto& me = static_cast&>(*self); - if (me._proto.connectTimerRole == ConnectTimerRole::kReconnect) { - std::string msg("connecting failed: timeout"); - FUERTE_LOG_DEBUG << "tryConnect timer, calling shutdownConnection: " << msg << " this=" << self.get() << "\n"; - me.shutdownConnection(Error::CouldNotConnect, msg); - } else { - FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get() << "\n"; - me._proto.cancel(); - } + me._proto.cancel(); + FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get() << "\n"; } }); From 3cf96ffe2a7f923651dc1a92d4ce71bff50bdf00 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Wed, 8 May 2024 11:00:26 +0200 Subject: [PATCH 16/22] fixes for socket shutdown --- 3rdParty/fuerte/src/AsioSockets.h | 67 ++++++++++++++++--------- 3rdParty/fuerte/src/GeneralConnection.h | 45 ++++++++++------- 2 files changed, 71 insertions(+), 41 deletions(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index 34d5cd74ccdb..551e40a61090 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -92,6 +92,11 @@ void resolveConnect(detail::ConnectionConfiguration const& config, } } // namespace +enum class ConnectTimerRole { + kConnect = 1, + kReconnect = 2, +}; + template struct Socket {}; @@ -143,14 +148,15 @@ struct Socket { template void shutdown(F&& cb) { - asio_ns::error_code ec; // prevents exceptions + // ec is an out parameter here that is passed to the methods so they + // can fill in whatever error happened. we ignore it here anyway. we + // use the ec-variants of the methods here to prevent exceptions. + asio_ns::error_code ec; try { -#ifndef _WIN32 - socket.cancel(ec); -#endif + timer.cancel(ec); if (socket.is_open()) { + socket.cancel(ec); socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); - ec.clear(); socket.close(ec); } } catch (std::exception const& ex) { @@ -164,6 +170,7 @@ struct Socket { asio_ns::ip::tcp::resolver resolver; asio_ns::ip::tcp::socket socket; asio_ns::steady_timer timer; + ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; bool canceled = false; }; @@ -248,7 +255,6 @@ struct Socket { if (socket.lowest_layer().is_open()) { // non-graceful shutdown asio_ns::error_code ec; socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); - ec.clear(); socket.lowest_layer().close(ec); } } catch (std::exception const& ex) { @@ -264,27 +270,30 @@ struct Socket { // socket is a member. This means that the allocation of the connection and // this of the socket is kept until all asynchronous operations are completed // (or aborted). - asio_ns::error_code ec; // prevents exceptions - socket.lowest_layer().cancel(ec); + + // ec is an out parameter here that is passed to the methods so they + // can fill in whatever error happened. we ignore it here anyway. we + // use the ec-variants of the methods here to prevent exceptions. + asio_ns::error_code ec; if (!socket.lowest_layer().is_open()) { timer.cancel(ec); std::forward(cb)(ec); return; } + + socket.lowest_layer().cancel(ec); cleanupDone = false; - timer.expires_from_now(std::chrono::seconds(3)); - socket.async_shutdown([cb, this](auto const& ec) { + // implicitly cancels any previous timers + timer.expires_after(std::chrono::seconds(3)); + + socket.async_shutdown([cb, this](asio_ns::error_code ec) { timer.cancel(); -#ifndef _WIN32 - if (!cleanupDone && (!ec || ec == asio_ns::error::basic_errors::not_connected)) { - asio_ns::error_code ec2; - socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec2); - ec2.clear(); - socket.lowest_layer().close(ec2); + if (!cleanupDone) { + socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); + socket.lowest_layer().close(ec); cleanupDone = true; } -#endif cb(ec); }); timer.async_wait([cb(std::forward(cb)), this](asio_ns::error_code ec) { @@ -292,7 +301,6 @@ struct Socket { // enough, please do not delete, although it is not used here! if (!ec && !cleanupDone) { socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); - ec.clear(); socket.lowest_layer().close(ec); cleanupDone = true; } @@ -305,6 +313,7 @@ struct Socket { asio_ns::io_context& ctx; asio_ns::ssl::context& sslContext; std::atomic cleanupDone; + ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; bool canceled = false; }; @@ -354,18 +363,28 @@ struct Socket { template void shutdown(F&& cb) { - asio_ns::error_code ec; // prevents exceptions - timer.cancel(ec); - if (socket.is_open()) { - socket.cancel(ec); - socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); - socket.close(ec); + // ec is an out parameter here that is passed to the methods so they + // can fill in whatever error happened. we ignore it here anyway. we + // use the ec-variants of the methods here to prevent exceptions. + asio_ns::error_code ec; + try { + timer.cancel(ec); + if (socket.is_open()) { + socket.cancel(ec); + socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); + socket.close(ec); + } + } catch (std::exception const& ex) { + // an exception is unlikely to occur here, as we are using the error-code + // variants of cancel/shutdown/close above + FUERTE_LOG_ERROR << "caught exception during unix socket shutdown: " << ex.what() << "\n"; } std::forward(cb)(ec); } asio_ns::local::stream_protocol::socket socket; asio_ns::steady_timer timer; + ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; bool canceled = false; }; #endif // ASIO_HAS_LOCAL_SOCKETS diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index 7ad2db6202d6..9f297ec03bf8 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -325,7 +325,8 @@ class GeneralConnection : public fuerte::Connection { FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; auto self = Connection::shared_from_this(); - + + _proto.connectTimerRole = ConnectTimerRole::kConnect; _proto.timer.expires_after(_config._connectTimeout); _proto.connect(_config, [self, retries](asio_ns::error_code ec) mutable { @@ -350,18 +351,17 @@ class GeneralConnection : public fuerte::Connection { FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), connecting failed: " << ec.message() << "\n"; if (retries > 1 && ec != asio_ns::error::operation_aborted) { FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), scheduling retry operation. this=" << self.get() << "\n"; + me._proto.connectTimerRole = ConnectTimerRole::kReconnect; me._proto.timer.expires_after(me._config._connectRetryPause); me._proto.timer.async_wait( [self = std::move(self), retries](asio_ns::error_code ec) mutable { auto& me = static_cast&>(*self); if (ec) { - // timer canceled. FUERTE_LOG_DEBUG << "tryConnect, retry timer canceled. this=" << self.get() << "\n"; - // this can happen when the timeout callback was already queued - in this - // case the connect callback will not be called with operation_aborted, - // so we must shutdown here - me.shutdownConnection(Error::CouldNotConnect, - "connecting failed: timeout"); + // we should not get here, because it is totally unexpected that the + // retry timer gets canceled + FUERTE_ASSERT(false && "retry timer should not have been canceled"); + me.shutdownConnection(Error::CouldNotConnect, "connecting failed: retry timer canceled"); return; } // rearm socket so that we can use it again @@ -377,16 +377,27 @@ class GeneralConnection : public fuerte::Connection { me.shutdownConnection(Error::CouldNotConnect, msg); } }); - - _proto.timer.async_wait([self = std::move(self)](asio_ns::error_code ec) { - if (!ec && self->state() == Connection::State::Connecting) { - // note: if the timer fires successfully, ec is empty here. - // the connect handler below gets 'operation_aborted' error - auto& me = static_cast&>(*self); - me._proto.cancel(); - FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get() << "\n"; - } - }); + + // only if we are still in the connect phase, we want to schedule a timer + // for the connect timeout. if the connect already failed and scheduled a + // timer for the reconnect timeout, we do not want to mess with the timer here. + if (_proto.connectTimerRole == ConnectTimerRole::kConnect) { + _proto.timer.async_wait([self = std::move(self)](asio_ns::error_code ec) { + if (!ec && self->state() == Connection::State::Connecting) { + // note: if the timer fires successfully, ec is empty here. + // the connect handler below gets 'operation_aborted' error + auto& me = static_cast&>(*self); + // cancel the socket operations only if we are still in the connect + // phase. + // otherwise, we are in the reconnect phase already, and we + // do not want to cancel the socket. + if (me._proto.connectTimerRole == ConnectTimerRole::kConnect) { + FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get() << "\n"; + me._proto.cancel(); + } + } + }); + } } From 5d1da43fa9215fcc7f37b70b387ff192ed392189 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Wed, 8 May 2024 11:34:19 +0200 Subject: [PATCH 17/22] remove invalid assertion --- 3rdParty/fuerte/src/GeneralConnection.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index 9f297ec03bf8..b20f59e6ec9c 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -358,9 +358,6 @@ class GeneralConnection : public fuerte::Connection { auto& me = static_cast&>(*self); if (ec) { FUERTE_LOG_DEBUG << "tryConnect, retry timer canceled. this=" << self.get() << "\n"; - // we should not get here, because it is totally unexpected that the - // retry timer gets canceled - FUERTE_ASSERT(false && "retry timer should not have been canceled"); me.shutdownConnection(Error::CouldNotConnect, "connecting failed: retry timer canceled"); return; } From 27c383eacb43735406c6a070db34340c2672ffb6 Mon Sep 17 00:00:00 2001 From: mpoeter Date: Wed, 8 May 2024 16:00:29 +0200 Subject: [PATCH 18/22] Fix: check if socket is open in connect callback It is possible that the timeout went off right before and has closed the socket before the callback was executed with a success code. --- 3rdParty/fuerte/src/GeneralConnection.h | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index b20f59e6ec9c..cd64df1c2b7f 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -325,17 +325,23 @@ class GeneralConnection : public fuerte::Connection { FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; auto self = Connection::shared_from_this(); - - _proto.connectTimerRole = ConnectTimerRole::kConnect; + + _proto.connectTimerRole = ConnectTimerRole::kConnect; _proto.timer.expires_after(_config._connectTimeout); _proto.connect(_config, [self, retries](asio_ns::error_code ec) mutable { auto& me = static_cast&>(*self); me.cancelTimer(); - // Note that is is possible that the alarm has already gone off, in which - // case its closure might already be queued right after ourselves! - // However, we now quickly set the state to `Connected` in which case the - // closure will no longer shut down the socket and ruin our success. + // Note that is is possible that the alarm has already gone off. In this + // case its closure could have already executed, or it may be queued right + // after ourselves! In the first case the socket has already been closed, + // so we need to check that. For the latter case we now set the state to + // `Connected`, so the closure will simply do nothing. + + if (!ec && !me._proto.socket.is_open()) { + // timer went off earlier and has already closed the socket. + ec = asio_ns::error::operation_aborted; + } if (!ec) { FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") established connection this=" << self.get() << "\n"; me.finishConnect(); From cf2ad97a83dc82a6a5fba733f90833855c27ddc0 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Wed, 8 May 2024 16:07:17 +0200 Subject: [PATCH 19/22] fix compilation --- 3rdParty/fuerte/src/AsioSockets.h | 12 ++++++++++++ 3rdParty/fuerte/src/GeneralConnection.h | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index 551e40a61090..a8da80570146 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -128,6 +128,10 @@ struct Socket { }); } + bool isOpen() const { + return socket.is_open(); + } + void rearm() { canceled = false; } @@ -240,6 +244,10 @@ struct Socket { return canceled; }); } + + bool isOpen() const { + return socket.lowest_layer().is_open(); + } void rearm() { // create a new socket and declare it ready @@ -343,6 +351,10 @@ struct Socket { asio_ns::local::stream_protocol::endpoint ep(config._host); socket.async_connect(ep, std::forward(done)); } + + bool isOpen() const { + return socket.is_open(); + } void rearm() { canceled = false; diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index cd64df1c2b7f..f28acb50719e 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -338,7 +338,7 @@ class GeneralConnection : public fuerte::Connection { // so we need to check that. For the latter case we now set the state to // `Connected`, so the closure will simply do nothing. - if (!ec && !me._proto.socket.is_open()) { + if (!ec && !me._proto.isOpen()) { // timer went off earlier and has already closed the socket. ec = asio_ns::error::operation_aborted; } From 52a3c69bb3bd248a5c6b1c1cd68e49fa0766033d Mon Sep 17 00:00:00 2001 From: jsteemann Date: Wed, 8 May 2024 16:09:39 +0200 Subject: [PATCH 20/22] add tests --- .../modules/@arangodb/testsuites/fuerte.js | 5 +- tests/Fuerte/ConnectionFailuresTest.cpp | 97 +++++++++++++++++++ tests/Fuerte/ConnectionTest.h | 1 + tests/Fuerte/main.cpp | 10 +- tests/test-definitions.txt | 3 +- 5 files changed, 107 insertions(+), 9 deletions(-) diff --git a/js/client/modules/@arangodb/testsuites/fuerte.js b/js/client/modules/@arangodb/testsuites/fuerte.js index 0cc4492f51d7..b95544282d9a 100644 --- a/js/client/modules/@arangodb/testsuites/fuerte.js +++ b/js/client/modules/@arangodb/testsuites/fuerte.js @@ -81,11 +81,10 @@ function gtestRunner(options) { } // start server - print('Starting server...'); + print('Starting ' + options.protocol + ' server...'); - let instanceManager = new im.instanceManager('tcp', options, { + let instanceManager = new im.instanceManager(options.protocol, options, { "http.keep-alive-timeout": "10", - "log.level": "requests=TRACE" }, 'fuerte'); instanceManager.prepareInstance(); instanceManager.launchTcpDump(""); diff --git a/tests/Fuerte/ConnectionFailuresTest.cpp b/tests/Fuerte/ConnectionFailuresTest.cpp index 02a12e3ac7e9..fcf0b756e01f 100644 --- a/tests/Fuerte/ConnectionFailuresTest.cpp +++ b/tests/Fuerte/ConnectionFailuresTest.cpp @@ -26,8 +26,16 @@ #include "gtest/gtest.h" +#include +#include +#include + +#include "Logger/LogMacros.h" + namespace f = ::arangodb::fuerte; +extern std::string myEndpoint; + // for testing connection failures, we need a free port that is not used by // another service. because we can't be sure about high port numbers, we are // using some from the very range. according to @@ -41,6 +49,43 @@ constexpr std::string_view urls[] = { "ssl://localhost:60", "h2s://localhost:60", }; +static std::pair runTimeoutTest(f::ConnectionBuilder& cbuilder, + int n) { + cbuilder.verifyHost(false); + + f::WaitGroup wg; + f::EventLoopService loop; + + std::atomic callbacksCalled = 0; + std::atomic failureCallbacksCalled = 0; + + cbuilder.onFailure([&](f::Error errorCode, std::string const& errorMessage) { + ASSERT_EQ(errorCode, f::Error::CouldNotConnect); + wg.done(); + ++failureCallbacksCalled; + }); + + for (int i = 0; i < n; ++i) { + wg.add(); + auto connection = cbuilder.connect(loop); + // Send a first request. (HTTP connection is only started upon first + // request) + auto request = f::createRequest(f::RestVerb::Get, "/_api/version"); + connection->sendRequest(std::move(request), + [&](f::Error error, std::unique_ptr, + std::unique_ptr) { + ++callbacksCalled; + if (error != f::Error::CouldNotConnect) { + wg.done(); + } + }); + } + auto success = wg.wait_for(std::chrono::seconds(60)); + EXPECT_TRUE(success); + + return {callbacksCalled.load(), failureCallbacksCalled.load()}; +} + // tryToConnectExpectFailure tries to make a connection to a host with given // url. This is expected to fail. static void tryToConnectExpectFailure(f::EventLoopService& eventLoopService, @@ -103,3 +148,55 @@ TEST(ConnectionFailureTest, CannotConnectForceRetries) { tryToConnectExpectFailure(loop, url, /*useRetries*/ true); } } + +TEST(ConnectionFailureTest, LowTimeouts) { + f::ConnectionBuilder cbuilder; + cbuilder.connectTimeout(std::chrono::milliseconds(1)); + cbuilder.connectRetryPause(std::chrono::milliseconds(1)); + cbuilder.maxConnectRetries(15); + cbuilder.endpoint("ssl://localhost:60"); + + int n = 100; + auto [callbacksCalled, failureCallbacksCalled] = runTimeoutTest(cbuilder, n); + ASSERT_EQ(n, callbacksCalled); + ASSERT_LE(n, failureCallbacksCalled); +} + +TEST(ConnectionFailureTest, LowTimeoutsActualBackend) { + f::ConnectionBuilder cbuilder; + cbuilder.connectTimeout(std::chrono::milliseconds(1)); + cbuilder.connectRetryPause(std::chrono::milliseconds(5)); + cbuilder.maxConnectRetries(15); + cbuilder.endpoint(myEndpoint); + + int n = 100; + auto [callbacksCalled, failureCallbacksCalled] = runTimeoutTest(cbuilder, n); + ASSERT_EQ(n, callbacksCalled); + ASSERT_LE(failureCallbacksCalled, n); +} + +TEST(ConnectionFailureTest, BorderlineTimeoutsActualBackend) { + f::ConnectionBuilder cbuilder; + cbuilder.connectTimeout(std::chrono::milliseconds(5)); + cbuilder.connectRetryPause(std::chrono::milliseconds(5)); + cbuilder.maxConnectRetries(15); + cbuilder.endpoint(myEndpoint); + + int n = 100; + auto [callbacksCalled, failureCallbacksCalled] = runTimeoutTest(cbuilder, n); + ASSERT_EQ(n, callbacksCalled); + ASSERT_LE(failureCallbacksCalled, n); +} + +TEST(ConnectionFailureTest, HighEnoughTimeoutsActualBackend) { + f::ConnectionBuilder cbuilder; + cbuilder.connectTimeout(std::chrono::milliseconds(60000)); + cbuilder.connectRetryPause(std::chrono::milliseconds(5)); + cbuilder.maxConnectRetries(15); + cbuilder.endpoint(myEndpoint); + + int n = 100; + auto [callbacksCalled, failureCallbacksCalled] = runTimeoutTest(cbuilder, n); + ASSERT_EQ(n, callbacksCalled); + ASSERT_EQ(0, failureCallbacksCalled); +} diff --git a/tests/Fuerte/ConnectionTest.h b/tests/Fuerte/ConnectionTest.h index b275bac07306..c0d46663168a 100644 --- a/tests/Fuerte/ConnectionTest.h +++ b/tests/Fuerte/ConnectionTest.h @@ -68,6 +68,7 @@ class ConnectionTestF : public ::testing::TestWithParam { std::shared_ptr createConnection() { // Set connection parameters fu::ConnectionBuilder cbuilder; + cbuilder.verifyHost(false); setupEndpointFromEnv(cbuilder); cbuilder.protocolType(GetParam()._protocol); setupAuthenticationFromEnv(cbuilder); diff --git a/tests/Fuerte/main.cpp b/tests/Fuerte/main.cpp index b8adc1891514..3718ba9832b1 100644 --- a/tests/Fuerte/main.cpp +++ b/tests/Fuerte/main.cpp @@ -10,7 +10,7 @@ std::string myEndpoint = "tcp://localhost:8529"; std::string myAuthentication = "basic:root:"; -std::vector parse_args(int const& argc, char const* const* argv) { +std::vector parse_args(int argc, char const* const* argv) { std::vector rv; for (int i = 0; i < argc; i++) { rv.emplace_back(argv[i]); @@ -24,12 +24,12 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); // removes google test parameters auto arguments = parse_args(argc, argv); // init √global Schmutz - const std::string endpointArg = "--endpoint="; - const std::string authArg = "--authentication="; + std::string endpointArg = "--endpoint="; + std::string authArg = "--authentication="; for (auto const& arg : arguments) { - if (arg.substr(0, endpointArg.size()) == endpointArg) { + if (arg.starts_with(endpointArg)) { myEndpoint = arg.substr(endpointArg.size()); - } else if (arg.substr(0, authArg.size()) == authArg) { + } else if (arg.starts_with(authArg)) { myAuthentication = arg.substr(authArg.size()); } } diff --git a/tests/test-definitions.txt b/tests/test-definitions.txt index 8c1082779840..1eb1001668c6 100644 --- a/tests/test-definitions.txt +++ b/tests/test-definitions.txt @@ -62,7 +62,8 @@ gtest_iresearch priority=1000 parallelity=3 size=medium single gtest_arangodb priority=1000 parallelity=3 size=medium single # Fuerte tests are executed in single env -fuerte priority=500 single +fuerte priority=500 single suffix=http +fuerte priority=500 single suffix=ssl-http -- --protocol ssl rta_makedata single rta_makedata single suffix=afo -- --activeFailover true From b8d7d332b2d4ea3c9cd726310fa34c18619e341a Mon Sep 17 00:00:00 2001 From: jsteemann Date: Fri, 10 May 2024 10:22:02 +0200 Subject: [PATCH 21/22] increase connection timeout --- 3rdParty/fuerte/include/fuerte/types.h | 2 +- CHANGELOG | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/3rdParty/fuerte/include/fuerte/types.h b/3rdParty/fuerte/include/fuerte/types.h index daf4fe785d5d..391c0a2a7ab5 100644 --- a/3rdParty/fuerte/include/fuerte/types.h +++ b/3rdParty/fuerte/include/fuerte/types.h @@ -216,7 +216,7 @@ struct ConnectionConfiguration { _host("localhost"), _port("8529"), _verifyHost(false), - _connectTimeout(15000), + _connectTimeout(60000), _idleTimeout(300000), _connectRetryPause(1000), _maxConnectRetries(3), diff --git a/CHANGELOG b/CHANGELOG index 3bcd9d7b4242..97f249dc298d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -6,6 +6,7 @@ v3.11.9 (XXXX-XX-XX) In this case, the low-level socket was repurposed, but not reset properly. This could leave the connection in an improper state and lead to callbacks for some requests to not being called as expected. + The connection timeout was also increased from 15 seconds to 60 seconds. * Improved the time required to create a new Collection in a database with hundreds of collections. This also improves times for indexes and dropping From 911200311012945435a350d91e4b472a80e6001d Mon Sep 17 00:00:00 2001 From: jsteemann Date: Mon, 13 May 2024 10:51:37 +0200 Subject: [PATCH 22/22] fix spurious test failure --- tests/Fuerte/ConnectionFailuresTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Fuerte/ConnectionFailuresTest.cpp b/tests/Fuerte/ConnectionFailuresTest.cpp index fcf0b756e01f..3c304a1b0beb 100644 --- a/tests/Fuerte/ConnectionFailuresTest.cpp +++ b/tests/Fuerte/ConnectionFailuresTest.cpp @@ -159,7 +159,7 @@ TEST(ConnectionFailureTest, LowTimeouts) { int n = 100; auto [callbacksCalled, failureCallbacksCalled] = runTimeoutTest(cbuilder, n); ASSERT_EQ(n, callbacksCalled); - ASSERT_LE(n, failureCallbacksCalled); + ASSERT_LE(failureCallbacksCalled, n); } TEST(ConnectionFailureTest, LowTimeoutsActualBackend) {