diff --git a/3rdParty/fuerte/include/fuerte/FuerteLogger.h b/3rdParty/fuerte/include/fuerte/FuerteLogger.h index 84c217c5aeb1..59dad2cb7567 100644 --- a/3rdParty/fuerte/include/fuerte/FuerteLogger.h +++ b/3rdParty/fuerte/include/fuerte/FuerteLogger.h @@ -26,14 +26,15 @@ #if 0 #include #include +#include -extern void LogHackWriter(char const* p); +extern void LogHackWriter(std::string_view p); class LogHack { std::stringstream _s; public: LogHack() {}; - ~LogHack() { LogHackWriter(_s.str().c_str()); }; + ~LogHack() { LogHackWriter(_s.str()); }; template LogHack& operator<<(T const& o) { _s << o; return *this; } typedef std::basic_ostream > CoutType; typedef CoutType& (*StandardEndLine)(CoutType&); @@ -115,11 +116,4 @@ class LogHack { if (0) std::cout #endif -#if ENABLE_FUERTE_LOG_NODE > 0 -#define FUERTE_LOG_NODE std::cout -#else -#define FUERTE_LOG_NODE \ - if (0) std::cout -#endif - #endif diff --git a/3rdParty/fuerte/include/fuerte/connection.h b/3rdParty/fuerte/include/fuerte/connection.h index 0411436fdb57..d6e9345a4a56 100644 --- a/3rdParty/fuerte/include/fuerte/connection.h +++ b/3rdParty/fuerte/include/fuerte/connection.h @@ -175,6 +175,14 @@ class ConnectionBuilder { return *this; } +#ifdef ARANGODB_USE_GOOGLE_TESTS + unsigned failConnectAttempts() const { return _conf._failConnectAttempts; } + ConnectionBuilder& failConnectAttempts(unsigned f) { + _conf._failConnectAttempts = f; + return *this; + } +#endif + // Set the authentication type of the connection AuthenticationType authenticationType() const { return _conf._authenticationType; diff --git a/3rdParty/fuerte/include/fuerte/loop.h b/3rdParty/fuerte/include/fuerte/loop.h index 07277ea28e7c..c0b7c6d29abf 100644 --- a/3rdParty/fuerte/include/fuerte/loop.h +++ b/3rdParty/fuerte/include/fuerte/loop.h @@ -27,9 +27,11 @@ #include +#include #include #include #include +#include // run / runWithWork / poll for Loop mapping to ioservice // free function run with threads / with thread group barrier and work diff --git a/3rdParty/fuerte/include/fuerte/types.h b/3rdParty/fuerte/include/fuerte/types.h index ea647889fe7b..391c0a2a7ab5 100644 --- a/3rdParty/fuerte/include/fuerte/types.h +++ b/3rdParty/fuerte/include/fuerte/types.h @@ -216,10 +216,13 @@ struct ConnectionConfiguration { _host("localhost"), _port("8529"), _verifyHost(false), - _connectTimeout(15000), + _connectTimeout(60000), _idleTimeout(300000), _connectRetryPause(1000), _maxConnectRetries(3), +#ifdef ARANGODB_USE_GOOGLE_TESTS + _failConnectAttempts(0), +#endif _useIdleTimeout(true), _authenticationType(AuthenticationType::None), _user(""), @@ -240,6 +243,9 @@ struct ConnectionConfiguration { std::chrono::milliseconds _idleTimeout; std::chrono::milliseconds _connectRetryPause; unsigned _maxConnectRetries; +#ifdef ARANGODB_USE_GOOGLE_TESTS + unsigned _failConnectAttempts; +#endif bool _useIdleTimeout; AuthenticationType _authenticationType; diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index e3cda967bd71..a8da80570146 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -30,12 +30,29 @@ namespace arangodb { namespace fuerte { inline namespace v1 { namespace { -template +template void resolveConnect(detail::ConnectionConfiguration const& config, asio_ns::ip::tcp::resolver& resolver, SocketT& socket, - F&& done) { - auto cb = [&socket, done(std::forward(done))](auto ec, auto it) mutable { + F&& done, IsAbortedCb&& isAborted) { + auto cb = [&socket, +#ifdef ARANGODB_USE_GOOGLE_TESTS + fail = config._failConnectAttempts > 0, +#endif + done = std::forward(done), + isAborted = std::forward(isAborted)](auto ec, auto it) mutable { +#ifdef ARANGODB_USE_GOOGLE_TESTS + if (fail) { + // use an error code != operation_aborted + ec = boost::system::errc::make_error_code(boost::system::errc::not_enough_memory); + } +#endif + + if (isAborted()) { + ec = asio_ns::error::operation_aborted; + } + if (ec) { // error in address resolver + FUERTE_LOG_DEBUG << "received error during address resolving: " << ec.message() << "\n"; done(ec); return; } @@ -44,7 +61,12 @@ void resolveConnect(detail::ConnectionConfiguration const& config, // A successful resolve operation is guaranteed to pass a // non-empty range to the handler. asio_ns::async_connect(socket, it, - [done(std::move(done))](auto ec, auto it) mutable { + [done](auto ec, auto it) mutable { + if (ec) { + FUERTE_LOG_DEBUG << "executing async connect callback, error: " << ec.message() << "\n"; + } else { + FUERTE_LOG_DEBUG << "executing async connect callback, no error\n"; + } std::forward(done)(ec); }); } catch (std::bad_alloc const&) { @@ -63,12 +85,18 @@ void resolveConnect(detail::ConnectionConfiguration const& config, auto it = resolver.resolve(config._host, config._port, ec); cb(ec, it); #else - // Resolve the host asynchronous into a series of endpoints + // Resolve the host asynchronously into a series of endpoints + FUERTE_LOG_DEBUG << "scheduled callback to resolve host " << config._host << ":" << config._port << "\n"; resolver.async_resolve(config._host, config._port, std::move(cb)); #endif } } // namespace +enum class ConnectTimerRole { + kConnect = 1, + kReconnect = 2, +}; + template struct Socket {}; @@ -77,14 +105,39 @@ struct Socket { Socket(EventLoopService&, asio_ns::io_context& ctx) : resolver(ctx), socket(ctx), timer(ctx) {} - ~Socket() { this->cancel(); } + ~Socket() { + try { + this->cancel(); + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what() << "\n"; + } + } template void connect(detail::ConnectionConfiguration const& config, F&& done) { - resolveConnect(config, resolver, socket, std::forward(done)); + resolveConnect(config, resolver, socket, [this, done = std::forward(done)](asio_ns::error_code ec) mutable { + FUERTE_LOG_DEBUG << "executing tcp connect callback, ec: " << ec.message() << ", canceled: " << this->canceled << "\n"; + if (canceled) { + // cancel() was already called on this socket + FUERTE_ASSERT(socket.is_open() == false); + ec = asio_ns::error::operation_aborted; + } + done(ec); + }, [this]() { + return canceled; + }); + } + + bool isOpen() const { + return socket.is_open(); } + void rearm() { + canceled = false; + } + void cancel() { + canceled = true; try { timer.cancel(); resolver.cancel(); @@ -92,23 +145,28 @@ struct Socket { asio_ns::error_code ec; socket.close(ec); } - } catch (...) { + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during tcp socket cancelation: " << ex.what() << "\n"; } } template void shutdown(F&& cb) { - asio_ns::error_code ec; // prevents exceptions + // ec is an out parameter here that is passed to the methods so they + // can fill in whatever error happened. we ignore it here anyway. we + // use the ec-variants of the methods here to prevent exceptions. + asio_ns::error_code ec; try { -#ifndef _WIN32 - socket.cancel(ec); -#endif + timer.cancel(ec); if (socket.is_open()) { + socket.cancel(ec); socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); - ec.clear(); socket.close(ec); } - } catch (...) { + } catch (std::exception const& ex) { + // an exception is unlikely to occur here, as we are using the error-code + // variants of cancel/shutdown/close above + FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what() << "\n"; } std::forward(cb)(ec); } @@ -116,21 +174,36 @@ struct Socket { asio_ns::ip::tcp::resolver resolver; asio_ns::ip::tcp::socket socket; asio_ns::steady_timer timer; + ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; + bool canceled = false; }; template <> struct Socket { Socket(EventLoopService& loop, asio_ns::io_context& ctx) - : resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), cleanupDone(false) {} + : resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), ctx(ctx), + sslContext(loop.sslContext()), cleanupDone(false) {} - ~Socket() { this->cancel(); } + ~Socket() { + try { + this->cancel(); + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during ssl socket shutdown: " << ex.what() << "\n"; + } + } template void connect(detail::ConnectionConfiguration const& config, F&& done) { bool verify = config._verifyHost; resolveConnect( config, resolver, socket.next_layer(), - [=, this, done(std::forward(done))](auto const& ec) mutable { + [=, this](asio_ns::error_code ec) mutable { + FUERTE_LOG_DEBUG << "executing ssl connect callback, ec: " << ec.message() << ", canceled: " << this->canceled << "\n"; + if (canceled) { + // cancel() was already called on this socket + FUERTE_ASSERT(socket.lowest_layer().is_open() == false); + ec = asio_ns::error::operation_aborted; + } if (ec) { done(ec); return; @@ -167,20 +240,33 @@ struct Socket { } socket.async_handshake(asio_ns::ssl::stream_base::client, std::move(done)); + }, [this]() { + return canceled; }); } + + bool isOpen() const { + return socket.lowest_layer().is_open(); + } + void rearm() { + // create a new socket and declare it ready + socket = asio_ns::ssl::stream(this->ctx, this->sslContext); + canceled = false; + } + void cancel() { + canceled = true; try { timer.cancel(); resolver.cancel(); if (socket.lowest_layer().is_open()) { // non-graceful shutdown asio_ns::error_code ec; socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); - ec.clear(); socket.lowest_layer().close(ec); } - } catch (...) { + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during ssl socket cancelation: " << ex.what() << "\n"; } } @@ -192,45 +278,51 @@ struct Socket { // socket is a member. This means that the allocation of the connection and // this of the socket is kept until all asynchronous operations are completed // (or aborted). - asio_ns::error_code ec; // prevents exceptions - socket.lowest_layer().cancel(ec); + + // ec is an out parameter here that is passed to the methods so they + // can fill in whatever error happened. we ignore it here anyway. we + // use the ec-variants of the methods here to prevent exceptions. + asio_ns::error_code ec; if (!socket.lowest_layer().is_open()) { timer.cancel(ec); std::forward(cb)(ec); return; } + + socket.lowest_layer().cancel(ec); cleanupDone = false; - timer.expires_from_now(std::chrono::seconds(3)); - timer.async_wait([cb, this](asio_ns::error_code ec) { - // Copy in callback such that the connection object is kept alive long - // enough, please do not delete, although it is not used here! - if (!cleanupDone && !ec) { + // implicitly cancels any previous timers + timer.expires_after(std::chrono::seconds(3)); + + socket.async_shutdown([cb, this](asio_ns::error_code ec) { + timer.cancel(); + if (!cleanupDone) { socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); - ec.clear(); socket.lowest_layer().close(ec); cleanupDone = true; } + cb(ec); }); - socket.async_shutdown([cb(std::forward(cb)), this](auto const& ec) { - timer.cancel(); -#ifndef _WIN32 - if (!cleanupDone && (!ec || ec == asio_ns::error::basic_errors::not_connected)) { - asio_ns::error_code ec2; - socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec2); - ec2.clear(); - socket.lowest_layer().close(ec2); + timer.async_wait([cb(std::forward(cb)), this](asio_ns::error_code ec) { + // Copy in callback such that the connection object is kept alive long + // enough, please do not delete, although it is not used here! + if (!ec && !cleanupDone) { + socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); + socket.lowest_layer().close(ec); cleanupDone = true; } -#endif - cb(ec); }); } asio_ns::ip::tcp::resolver resolver; asio_ns::ssl::stream socket; asio_ns::steady_timer timer; + asio_ns::io_context& ctx; + asio_ns::ssl::context& sslContext; std::atomic cleanupDone; + ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; + bool canceled = false; }; #ifdef ASIO_HAS_LOCAL_SOCKETS @@ -238,36 +330,74 @@ template <> struct Socket { Socket(EventLoopService&, asio_ns::io_context& ctx) : socket(ctx), timer(ctx) {} - ~Socket() { this->cancel(); } + + ~Socket() { + canceled = true; + try { + this->cancel(); + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during unix socket shutdown: " << ex.what() << "\n"; + } + } template void connect(detail::ConnectionConfiguration const& config, F&& done) { + if (canceled) { + // cancel() was already called on this socket + done(asio_ns::error::operation_aborted); + return; + } + asio_ns::local::stream_protocol::endpoint ep(config._host); socket.async_connect(ep, std::forward(done)); } + + bool isOpen() const { + return socket.is_open(); + } + void rearm() { + canceled = false; + } + void cancel() { - timer.cancel(); - if (socket.is_open()) { // non-graceful shutdown - asio_ns::error_code ec; - socket.close(ec); + canceled = true; + try { + timer.cancel(); + if (socket.is_open()) { // non-graceful shutdown + asio_ns::error_code ec; + socket.close(ec); + } + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during unix socket cancelation: " << ex.what() << "\n"; } } template void shutdown(F&& cb) { - asio_ns::error_code ec; // prevents exceptions - timer.cancel(ec); - if (socket.is_open()) { - socket.cancel(ec); - socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); - socket.close(ec); + // ec is an out parameter here that is passed to the methods so they + // can fill in whatever error happened. we ignore it here anyway. we + // use the ec-variants of the methods here to prevent exceptions. + asio_ns::error_code ec; + try { + timer.cancel(ec); + if (socket.is_open()) { + socket.cancel(ec); + socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); + socket.close(ec); + } + } catch (std::exception const& ex) { + // an exception is unlikely to occur here, as we are using the error-code + // variants of cancel/shutdown/close above + FUERTE_LOG_ERROR << "caught exception during unix socket shutdown: " << ex.what() << "\n"; } std::forward(cb)(ec); } asio_ns::local::stream_protocol::socket socket; asio_ns::steady_timer timer; + ConnectTimerRole connectTimerRole = ConnectTimerRole::kConnect; + bool canceled = false; }; #endif // ASIO_HAS_LOCAL_SOCKETS diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index a8efe29bab73..f28acb50719e 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -74,6 +74,10 @@ #include "AsioSockets.h" #include "debugging.h" +#include +#include +#include + namespace arangodb { namespace fuerte { using Clock = std::chrono::steady_clock; @@ -86,7 +90,11 @@ class GeneralConnection : public fuerte::Connection { detail::ConnectionConfiguration const& config) : Connection(config), _io_context(loop.nextIOContext()), - _proto(loop, *_io_context) {} + _proto(loop, *_io_context) { +#ifdef ARANGODB_USE_GOOGLE_TESTS + _failConnectAttempts = config._failConnectAttempts; +#endif + } virtual ~GeneralConnection() noexcept { _state.store(Connection::State::Closed); @@ -150,10 +158,10 @@ class GeneralConnection : public fuerte::Connection { /// @brief cancel the connection, unusable afterwards void cancel() override { FUERTE_LOG_DEBUG << "cancel: this=" << this << "\n"; - asio_ns::post(*_io_context, [self(weak_from_this()), this] { + asio_ns::post(*_io_context, [self(weak_from_this())] { auto s = self.lock(); if (s) { - shutdownConnection(Error::ConnectionCanceled); + static_cast&>(*s).shutdownConnection(Error::ConnectionCanceled); } }); } @@ -168,12 +176,11 @@ class GeneralConnection : public fuerte::Connection { if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) { FUERTE_LOG_DEBUG << "startConnection: this=" << this << "\n"; FUERTE_ASSERT(_config._maxConnectRetries > 0); - tryConnect(_config._maxConnectRetries, Clock::now(), - asio_ns::error_code()); + tryConnect(_config._maxConnectRetries); } else { FUERTE_LOG_DEBUG << "startConnection: this=" << this << " found unexpected state " << static_cast(exp) - << " not equal to 'Created'"; + << " not equal to 'Created'\n"; FUERTE_ASSERT(false); } } @@ -196,9 +203,10 @@ class GeneralConnection : public fuerte::Connection { abortRequests(err, /*now*/ Clock::time_point::max()); - _proto.shutdown([=, this, self(shared_from_this())](auto const& ec) { - terminateActivity(err); - onFailure(err, msg); + _proto.shutdown([=, self = shared_from_this()](asio_ns::error_code ec) { + auto& me = static_cast&>(*self); + me.terminateActivity(err); + me.onFailure(err, msg); }); // Close socket } @@ -262,7 +270,7 @@ class GeneralConnection : public fuerte::Connection { /// The following is called when the connection is permanently failed. It is /// used to shut down any activity in the derived classes in a way that avoids /// sleeping barbers - void terminateActivity(const fuerte::Error err) { + void terminateActivity(fuerte::Error err) { // Usually, we are `active == true` when we get here, except for the // following case: If we are inactive but the connection is still open and // then the idle timeout goes off, then we shutdownConnection and in the TLS @@ -281,6 +289,14 @@ class GeneralConnection : public fuerte::Connection { } } + void cancelTimer() noexcept { + try { + this->_proto.timer.cancel(); + } catch (std::exception const& ex) { + FUERTE_LOG_ERROR << "caught exception during timer cancelation: " << ex.what() << "\n"; + } + } + protected: virtual void finishConnect() = 0; @@ -299,58 +315,93 @@ class GeneralConnection : public fuerte::Connection { RequestCallback&& cb) = 0; private: - // Connect with a given number of retries - void tryConnect(unsigned retries, Clock::time_point start, - asio_ns::error_code const& ec) { - if (_state.load() != Connection::State::Connecting) { - return; - } + // Try to connect with a given number of retries + void tryConnect(unsigned retries) { + FUERTE_ASSERT(retries > 0); - if (retries == 0) { - std::string msg("connecting failed: '"); - msg.append((ec != asio_ns::error::operation_aborted) ? ec.message() - : "timeout"); - msg.push_back('\''); - shutdownConnection(Error::CouldNotConnect, msg); + if (_state.load() != Connection::State::Connecting) { return; } FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; auto self = Connection::shared_from_this(); - _proto.timer.expires_at(start + _config._connectTimeout); - _proto.timer.async_wait([self](asio_ns::error_code const& ec) { - if (!ec && self->state() == Connection::State::Connecting) { - // the connect handler below gets 'operation_aborted' error - static_cast&>(*self)._proto.cancel(); - } - }); + _proto.connectTimerRole = ConnectTimerRole::kConnect; + _proto.timer.expires_after(_config._connectTimeout); - _proto.connect(_config, [self, start, retries](auto const& ec) mutable { + _proto.connect(_config, [self, retries](asio_ns::error_code ec) mutable { auto& me = static_cast&>(*self); - me._proto.timer.cancel(); - // Note that is is possible that the alarm has already gone off, in which - // case its closure might already be queued right after ourselves! - // However, we now quickly set the state to `Connected` in which case the - // closure will no longer shut down the socket and ruin our success. + me.cancelTimer(); + // Note that is is possible that the alarm has already gone off. In this + // case its closure could have already executed, or it may be queued right + // after ourselves! In the first case the socket has already been closed, + // so we need to check that. For the latter case we now set the state to + // `Connected`, so the closure will simply do nothing. + + if (!ec && !me._proto.isOpen()) { + // timer went off earlier and has already closed the socket. + ec = asio_ns::error::operation_aborted; + } if (!ec) { + FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") established connection this=" << self.get() << "\n"; me.finishConnect(); return; } - FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n"; - if (retries > 0 && ec != asio_ns::error::operation_aborted) { - auto end = std::min(Clock::now() + me._config._connectRetryPause, - start + me._config._connectTimeout); - me._proto.timer.expires_at(end); + +#ifdef ARANGODB_USE_GOOGLE_TESTS + if (me._failConnectAttempts > 0) { + --me._failConnectAttempts; + } +#endif + + FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), connecting failed: " << ec.message() << "\n"; + if (retries > 1 && ec != asio_ns::error::operation_aborted) { + FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), scheduling retry operation. this=" << self.get() << "\n"; + me._proto.connectTimerRole = ConnectTimerRole::kReconnect; + me._proto.timer.expires_after(me._config._connectRetryPause); me._proto.timer.async_wait( - [self(std::move(self)), start, retries](auto ec) mutable { + [self = std::move(self), retries](asio_ns::error_code ec) mutable { auto& me = static_cast&>(*self); - me.tryConnect(!ec ? retries - 1 : 0, start, ec); + if (ec) { + FUERTE_LOG_DEBUG << "tryConnect, retry timer canceled. this=" << self.get() << "\n"; + me.shutdownConnection(Error::CouldNotConnect, "connecting failed: retry timer canceled"); + return; + } + // rearm socket so that we can use it again + FUERTE_LOG_DEBUG << "tryConnect, rearming connection this=" << self.get() << "\n"; + me._proto.rearm(); + me.tryConnect(retries - 1); }); } else { - me.tryConnect(0, start, ec); // <- handles errors + std::string msg("connecting failed: "); + msg.append((ec != asio_ns::error::operation_aborted) ? ec.message() + : "timeout"); + FUERTE_LOG_DEBUG << "tryConnect, calling shutdownConnection: " << msg << " this=" << self.get() << "\n"; + me.shutdownConnection(Error::CouldNotConnect, msg); } }); + + // only if we are still in the connect phase, we want to schedule a timer + // for the connect timeout. if the connect already failed and scheduled a + // timer for the reconnect timeout, we do not want to mess with the timer here. + if (_proto.connectTimerRole == ConnectTimerRole::kConnect) { + _proto.timer.async_wait([self = std::move(self)](asio_ns::error_code ec) { + if (!ec && self->state() == Connection::State::Connecting) { + // note: if the timer fires successfully, ec is empty here. + // the connect handler below gets 'operation_aborted' error + auto& me = static_cast&>(*self); + // cancel the socket operations only if we are still in the connect + // phase. + // otherwise, we are in the reconnect phase already, and we + // do not want to cancel the socket. + if (me._proto.connectTimerRole == ConnectTimerRole::kConnect) { + FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get() << "\n"; + me._proto.cancel(); + } + } + }); + } + } protected: @@ -373,6 +424,12 @@ class GeneralConnection : public fuerte::Connection { std::atomic _active{false}; +#ifdef ARANGODB_USE_GOOGLE_TESTS + // if this member is > 0, then this many connection attempts will fail + // in this connection + unsigned _failConnectAttempts = 0; +#endif + bool _reading = false; // set to true while an async_read is ongoing bool _writing = false; // set between starting an asyncWrite operation and // executing the completion handler}; @@ -425,8 +482,7 @@ struct MultiConnection : public GeneralConnection { void setTimeout(bool setIOBegin) { const bool wasIdle = _streams.empty(); if (wasIdle && !this->_config._useIdleTimeout) { - asio_ns::error_code ec; - this->_proto.timer.cancel(ec); + this->cancelTimer(); return; } diff --git a/3rdParty/fuerte/src/H1Connection.cpp b/3rdParty/fuerte/src/H1Connection.cpp index 15ef4ab9f136..ab542095a316 100644 --- a/3rdParty/fuerte/src/H1Connection.cpp +++ b/3rdParty/fuerte/src/H1Connection.cpp @@ -363,7 +363,7 @@ void H1Connection::asyncWriteCallback(asio_ns::error_code const& ec, FUERTE_ASSERT(this->_state == Connection::State::Connected || this->_state == Connection::State::Closed); this->_writing = false; // indicate that no async write is ongoing any more - this->_proto.timer.cancel(); // cancel alarm for timeout + this->cancelTimer(); // cancel alarm for timeout auto const now = Clock::now(); if (ec || _item == nullptr || _item->expires < now) { @@ -403,7 +403,7 @@ template void H1Connection::asyncReadCallback(asio_ns::error_code const& ec) { // Do not cancel timeout now, because we might be going on to read! if (_item == nullptr) { // could happen on aborts - this->_proto.timer.cancel(); + this->cancelTimer(); this->shutdownConnection(Error::CloseRequested); return; } @@ -443,7 +443,8 @@ void H1Connection::asyncReadCallback(asio_ns::error_code const& ec) { FUERTE_ASSERT(_response != nullptr); _messageComplete = false; // prevent entering branch on EOF - this->_proto.timer.cancel(); // got response in time + this->cancelTimer(); // got response in time + if (!_responseBuffer.empty()) { _response->setPayload(std::move(_responseBuffer), 0); } @@ -502,8 +503,7 @@ template void H1Connection::setIOTimeout() { const bool isIdle = _item == nullptr; if (isIdle && !this->_config._useIdleTimeout) { - asio_ns::error_code ec; - this->_proto.timer.cancel(ec); + this->cancelTimer(); return; } diff --git a/3rdParty/fuerte/src/H2Connection.cpp b/3rdParty/fuerte/src/H2Connection.cpp index 30911511ac45..10fb761f7eff 100644 --- a/3rdParty/fuerte/src/H2Connection.cpp +++ b/3rdParty/fuerte/src/H2Connection.cpp @@ -350,7 +350,7 @@ void H2Connection::readSwitchingProtocolsResponse() { this->_proto.socket, this->_receiveBuffer, "\r\n\r\n", [self](asio_ns::error_code const& ec, size_t nread) { auto& me = static_cast&>(*self); - me._proto.timer.cancel(); + me.cancelTimer(); if (ec) { me.shutdownConnection(Error::ReadError, "error reading upgrade response"); diff --git a/CHANGELOG b/CHANGELOG index 3c23920fab3e..a53d105516a9 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,13 @@ v3.11.9 (XXXX-XX-XX) -------------------- +* Fix connection retry attempts for cluster-internal TLS connections that ran + into the 15 seconds timeout during the connection establishing attempt. + In this case, the low-level socket was repurposed, but not reset properly. + This could leave the connection in an improper state and lead to callbacks + for some requests to not being called as expected. + The connection timeout was also increased from 15 seconds to 60 seconds. + * FE-448: auto-repair collection document JSON on save. * Retry cluster query shutdown in case no connection can be made to the @@ -10,9 +17,6 @@ v3.11.9 (XXXX-XX-XX) hundreds of collections. This also improves times for indexes and dropping of collections. -* Prioritize requests for commiting or aborting streaming transactions on - leaders and followers, because they can unblock other operations. - * Prioritize requests for commiting or aborting streaming transactions on leaders and followers, because they can unblock other operations. Also prioritize requests in already started streaming transactions and AQL queries diff --git a/js/client/modules/@arangodb/testsuites/fuerte.js b/js/client/modules/@arangodb/testsuites/fuerte.js index 0cc4492f51d7..b95544282d9a 100644 --- a/js/client/modules/@arangodb/testsuites/fuerte.js +++ b/js/client/modules/@arangodb/testsuites/fuerte.js @@ -81,11 +81,10 @@ function gtestRunner(options) { } // start server - print('Starting server...'); + print('Starting ' + options.protocol + ' server...'); - let instanceManager = new im.instanceManager('tcp', options, { + let instanceManager = new im.instanceManager(options.protocol, options, { "http.keep-alive-timeout": "10", - "log.level": "requests=TRACE" }, 'fuerte'); instanceManager.prepareInstance(); instanceManager.launchTcpDump(""); diff --git a/lib/Logger/LoggerFeature.cpp b/lib/Logger/LoggerFeature.cpp index 27e353e662a8..a03df065032f 100644 --- a/lib/Logger/LoggerFeature.cpp +++ b/lib/Logger/LoggerFeature.cpp @@ -60,9 +60,7 @@ using namespace arangodb::options; // Please leave this code in for the next time we have to debug fuerte. #if 0 -void LogHackWriter(char const* p) { - LOG_DEVEL << p; -} +void LogHackWriter(std::string_view msg) { LOG_DEVEL << msg; } #endif namespace arangodb { diff --git a/tests/Fuerte/ConnectionFailuresTest.cpp b/tests/Fuerte/ConnectionFailuresTest.cpp index 4896f07d0508..3c304a1b0beb 100644 --- a/tests/Fuerte/ConnectionFailuresTest.cpp +++ b/tests/Fuerte/ConnectionFailuresTest.cpp @@ -26,21 +26,85 @@ #include "gtest/gtest.h" +#include +#include +#include + +#include "Logger/LogMacros.h" + namespace f = ::arangodb::fuerte; +extern std::string myEndpoint; + +// for testing connection failures, we need a free port that is not used by +// another service. because we can't be sure about high port numbers, we are +// using some from the very range. according to +// https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml?&page=2 +// port 60 is unassigned and thus likely not taken by any service on the test +// host. previously we used port 8629 for testing connection failures, but this +// was flawed because it was possible that some service was running on port +// 8629, which then made the connection failure tests fail. +constexpr std::string_view urls[] = { + "http://localhost:60", "h2://localhost:60", "vst://localhost:60", + "ssl://localhost:60", "h2s://localhost:60", +}; + +static std::pair runTimeoutTest(f::ConnectionBuilder& cbuilder, + int n) { + cbuilder.verifyHost(false); + + f::WaitGroup wg; + f::EventLoopService loop; + + std::atomic callbacksCalled = 0; + std::atomic failureCallbacksCalled = 0; + + cbuilder.onFailure([&](f::Error errorCode, std::string const& errorMessage) { + ASSERT_EQ(errorCode, f::Error::CouldNotConnect); + wg.done(); + ++failureCallbacksCalled; + }); + + for (int i = 0; i < n; ++i) { + wg.add(); + auto connection = cbuilder.connect(loop); + // Send a first request. (HTTP connection is only started upon first + // request) + auto request = f::createRequest(f::RestVerb::Get, "/_api/version"); + connection->sendRequest(std::move(request), + [&](f::Error error, std::unique_ptr, + std::unique_ptr) { + ++callbacksCalled; + if (error != f::Error::CouldNotConnect) { + wg.done(); + } + }); + } + auto success = wg.wait_for(std::chrono::seconds(60)); + EXPECT_TRUE(success); + + return {callbacksCalled.load(), failureCallbacksCalled.load()}; +} + // tryToConnectExpectFailure tries to make a connection to a host with given // url. This is expected to fail. static void tryToConnectExpectFailure(f::EventLoopService& eventLoopService, - const std::string& url) { + std::string_view url, bool useRetries) { f::WaitGroup wg; wg.add(); f::ConnectionBuilder cbuilder; cbuilder.connectTimeout(std::chrono::milliseconds(250)); cbuilder.connectRetryPause(std::chrono::milliseconds(100)); - cbuilder.endpoint(url); +#ifdef ARANGODB_USE_GOOGLE_TESTS + if (useRetries) { + cbuilder.failConnectAttempts(2); + } + cbuilder.maxConnectRetries(3); +#endif + cbuilder.endpoint(std::string{url}); - cbuilder.onFailure([&](f::Error errorCode, const std::string& errorMessage) { + cbuilder.onFailure([&](f::Error errorCode, std::string const& errorMessage) { ASSERT_EQ(errorCode, f::Error::CouldNotConnect); wg.done(); }); @@ -59,29 +123,80 @@ static void tryToConnectExpectFailure(f::EventLoopService& eventLoopService, // that cannot be resolved. TEST(ConnectionFailureTest, CannotResolveHttp) { f::EventLoopService loop; - tryToConnectExpectFailure(loop, - "http://thishostmustnotexist.arangodb.com:8529"); + tryToConnectExpectFailure( + loop, "http://thishostmustnotexist.arangodb.com:8529", false); } TEST(ConnectionFailureTest, CannotResolveVst) { f::EventLoopService loop; - tryToConnectExpectFailure(loop, - "vst://thishostmustnotexist.arangodb.com:8529"); + tryToConnectExpectFailure( + loop, "vst://thishostmustnotexist.arangodb.com:8529", false); } // CannotConnect tests try to make a connection to a host with a valid name // but a wrong port. -TEST(ConnectionFailureTest, CannotConnectHttp) { - f::EventLoopService loop; - tryToConnectExpectFailure(loop, "http://localhost:8629"); +TEST(ConnectionFailureTest, CannotConnect) { + for (auto const& url : urls) { + f::EventLoopService loop; + tryToConnectExpectFailure(loop, url, /*useRetries*/ false); + } } -TEST(ConnectionFailureTest, CannotConnectHttp2) { - f::EventLoopService loop; - tryToConnectExpectFailure(loop, "h2://localhost:8629"); +TEST(ConnectionFailureTest, CannotConnectForceRetries) { + for (auto const& url : urls) { + f::EventLoopService loop; + tryToConnectExpectFailure(loop, url, /*useRetries*/ true); + } } -TEST(ConnectionFailureTest, CannotConnectVst) { - f::EventLoopService loop; - tryToConnectExpectFailure(loop, "vst://localhost:8629"); +TEST(ConnectionFailureTest, LowTimeouts) { + f::ConnectionBuilder cbuilder; + cbuilder.connectTimeout(std::chrono::milliseconds(1)); + cbuilder.connectRetryPause(std::chrono::milliseconds(1)); + cbuilder.maxConnectRetries(15); + cbuilder.endpoint("ssl://localhost:60"); + + int n = 100; + auto [callbacksCalled, failureCallbacksCalled] = runTimeoutTest(cbuilder, n); + ASSERT_EQ(n, callbacksCalled); + ASSERT_LE(failureCallbacksCalled, n); +} + +TEST(ConnectionFailureTest, LowTimeoutsActualBackend) { + f::ConnectionBuilder cbuilder; + cbuilder.connectTimeout(std::chrono::milliseconds(1)); + cbuilder.connectRetryPause(std::chrono::milliseconds(5)); + cbuilder.maxConnectRetries(15); + cbuilder.endpoint(myEndpoint); + + int n = 100; + auto [callbacksCalled, failureCallbacksCalled] = runTimeoutTest(cbuilder, n); + ASSERT_EQ(n, callbacksCalled); + ASSERT_LE(failureCallbacksCalled, n); +} + +TEST(ConnectionFailureTest, BorderlineTimeoutsActualBackend) { + f::ConnectionBuilder cbuilder; + cbuilder.connectTimeout(std::chrono::milliseconds(5)); + cbuilder.connectRetryPause(std::chrono::milliseconds(5)); + cbuilder.maxConnectRetries(15); + cbuilder.endpoint(myEndpoint); + + int n = 100; + auto [callbacksCalled, failureCallbacksCalled] = runTimeoutTest(cbuilder, n); + ASSERT_EQ(n, callbacksCalled); + ASSERT_LE(failureCallbacksCalled, n); +} + +TEST(ConnectionFailureTest, HighEnoughTimeoutsActualBackend) { + f::ConnectionBuilder cbuilder; + cbuilder.connectTimeout(std::chrono::milliseconds(60000)); + cbuilder.connectRetryPause(std::chrono::milliseconds(5)); + cbuilder.maxConnectRetries(15); + cbuilder.endpoint(myEndpoint); + + int n = 100; + auto [callbacksCalled, failureCallbacksCalled] = runTimeoutTest(cbuilder, n); + ASSERT_EQ(n, callbacksCalled); + ASSERT_EQ(0, failureCallbacksCalled); } diff --git a/tests/Fuerte/ConnectionTest.h b/tests/Fuerte/ConnectionTest.h index b275bac07306..c0d46663168a 100644 --- a/tests/Fuerte/ConnectionTest.h +++ b/tests/Fuerte/ConnectionTest.h @@ -68,6 +68,7 @@ class ConnectionTestF : public ::testing::TestWithParam { std::shared_ptr createConnection() { // Set connection parameters fu::ConnectionBuilder cbuilder; + cbuilder.verifyHost(false); setupEndpointFromEnv(cbuilder); cbuilder.protocolType(GetParam()._protocol); setupAuthenticationFromEnv(cbuilder); diff --git a/tests/Fuerte/main.cpp b/tests/Fuerte/main.cpp index b8adc1891514..3718ba9832b1 100644 --- a/tests/Fuerte/main.cpp +++ b/tests/Fuerte/main.cpp @@ -10,7 +10,7 @@ std::string myEndpoint = "tcp://localhost:8529"; std::string myAuthentication = "basic:root:"; -std::vector parse_args(int const& argc, char const* const* argv) { +std::vector parse_args(int argc, char const* const* argv) { std::vector rv; for (int i = 0; i < argc; i++) { rv.emplace_back(argv[i]); @@ -24,12 +24,12 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); // removes google test parameters auto arguments = parse_args(argc, argv); // init √global Schmutz - const std::string endpointArg = "--endpoint="; - const std::string authArg = "--authentication="; + std::string endpointArg = "--endpoint="; + std::string authArg = "--authentication="; for (auto const& arg : arguments) { - if (arg.substr(0, endpointArg.size()) == endpointArg) { + if (arg.starts_with(endpointArg)) { myEndpoint = arg.substr(endpointArg.size()); - } else if (arg.substr(0, authArg.size()) == authArg) { + } else if (arg.starts_with(authArg)) { myAuthentication = arg.substr(authArg.size()); } } diff --git a/tests/test-definitions.txt b/tests/test-definitions.txt index 8c1082779840..1eb1001668c6 100644 --- a/tests/test-definitions.txt +++ b/tests/test-definitions.txt @@ -62,7 +62,8 @@ gtest_iresearch priority=1000 parallelity=3 size=medium single gtest_arangodb priority=1000 parallelity=3 size=medium single # Fuerte tests are executed in single env -fuerte priority=500 single +fuerte priority=500 single suffix=http +fuerte priority=500 single suffix=ssl-http -- --protocol ssl rta_makedata single rta_makedata single suffix=afo -- --activeFailover true