From 55605167a0e8cf1739158038530ec9d8e62fadb9 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Thu, 17 Jun 2021 15:10:46 +0000 Subject: [PATCH 1/3] Adding timeouts to message endpoints --- include/faabric/transport/MessageEndpoint.h | 15 +++++++ src/transport/MessageEndpoint.cpp | 30 ++++++++++++++ .../test_message_endpoint_client.cpp | 39 +++++++++++++++++++ 3 files changed, 84 insertions(+) diff --git a/include/faabric/transport/MessageEndpoint.h b/include/faabric/transport/MessageEndpoint.h index 4e463e07a..51f6c5bfd 100644 --- a/include/faabric/transport/MessageEndpoint.h +++ b/include/faabric/transport/MessageEndpoint.h @@ -13,6 +13,11 @@ #define ANY_HOST "0.0.0.0" +// These timeouts should be long enough to permit sending and receiving large +// messages, but short enough not to hang around when something has gone wrong. +#define DEFAULT_RECV_TIMEOUT_MS 20000 +#define DEFAULT_SEND_TIMEOUT_MS 20000 + namespace faabric::transport { enum class SocketType { @@ -59,11 +64,21 @@ class MessageEndpoint int getPort(); + void setRecvTimeoutMs(int value); + + void setSendTimeoutMs(int value); + protected: const std::string host; const int port; std::thread::id tid; int id; + + private: + int recvTimeoutMs = DEFAULT_RECV_TIMEOUT_MS; + int sendTimeoutMs = DEFAULT_SEND_TIMEOUT_MS; + + void validateTimeout(int value); }; /* Send and Recv Message Endpoints */ diff --git a/src/transport/MessageEndpoint.cpp b/src/transport/MessageEndpoint.cpp index 1cb71a871..33dd61890 100644 --- a/src/transport/MessageEndpoint.cpp +++ b/src/transport/MessageEndpoint.cpp @@ -55,6 +55,7 @@ void MessageEndpoint::open(faabric::transport::MessageContext& context, e.what()); throw; } + break; default: throw std::runtime_error("Unrecognized socket type"); @@ -78,6 +79,10 @@ void MessageEndpoint::open(faabric::transport::MessageContext& context, throw; } } + + // Set socket options + this->socket->setsockopt(ZMQ_RCVTIMEO, recvTimeoutMs); + this->socket->setsockopt(ZMQ_SNDTIMEO, sendTimeoutMs); } void MessageEndpoint::send(uint8_t* serialisedMsg, size_t msgSize, bool more) @@ -253,6 +258,31 @@ int MessageEndpoint::getPort() return port; } +void MessageEndpoint::validateTimeout(int value) +{ + if (value <= 0) { + SPDLOG_ERROR("Setting invalid timeout of {}", value); + throw std::runtime_error("Setting invalid timeout"); + } + + if (socket != nullptr) { + SPDLOG_ERROR("Setting timeout of {} after socket created", value); + throw std::runtime_error("Setting timeout after socket created"); + } +} + +void MessageEndpoint::setRecvTimeoutMs(int value) +{ + validateTimeout(value); + recvTimeoutMs = value; +} + +void MessageEndpoint::setSendTimeoutMs(int value) +{ + validateTimeout(value); + sendTimeoutMs = value; +} + /* Send and Recv Message Endpoints */ SendMessageEndpoint::SendMessageEndpoint(const std::string& hostIn, int portIn) diff --git a/tests/test/transport/test_message_endpoint_client.cpp b/tests/test/transport/test_message_endpoint_client.cpp index 7119602ab..ea41b3585 100644 --- a/tests/test/transport/test_message_endpoint_client.cpp +++ b/tests/test/transport/test_message_endpoint_client.cpp @@ -202,4 +202,43 @@ TEST_CASE_METHOD(MessageContextFixture, // Close the destination endpoint dst.close(); } + +TEST_CASE_METHOD(MessageContextFixture, + "Test can't set invalid send/recv timeouts", + "[transport]") +{ + MessageEndpoint cli(thisHost, testPort); + + SECTION("Sanity check valid timeout") + { + REQUIRE_NOTHROW(cli.setRecvTimeoutMs(100)); + REQUIRE_NOTHROW(cli.setSendTimeoutMs(100)); + } + + SECTION("Recv zero timeout") { REQUIRE_THROWS(cli.setRecvTimeoutMs(0)); } + + SECTION("Send zero timeout") { REQUIRE_THROWS(cli.setSendTimeoutMs(0)); } + + SECTION("Recv negative timeout") + { + REQUIRE_THROWS(cli.setRecvTimeoutMs(-1)); + } + + SECTION("Send negative timeout") + { + REQUIRE_THROWS(cli.setSendTimeoutMs(-1)); + } + + SECTION("Recv, socket already initialised") + { + cli.open(context, SocketType::PULL, false); + REQUIRE_THROWS(cli.setRecvTimeoutMs(100)); + } + + SECTION("Send, socket already initialised") + { + cli.open(context, SocketType::PULL, false); + REQUIRE_THROWS(cli.setSendTimeoutMs(100)); + } +} } From 1cfd4168b327a5b6b42b000fd245fe67d92ff308 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Thu, 17 Jun 2021 15:56:36 +0000 Subject: [PATCH 2/3] Add tests for timeouts --- include/faabric/transport/MessageEndpoint.h | 10 ++- src/transport/MessageEndpoint.cpp | 17 +++-- src/transport/MessageEndpointClient.cpp | 5 ++ tests/test/transport/test_message_server.cpp | 75 ++++++++++++++++++++ 4 files changed, 101 insertions(+), 6 deletions(-) diff --git a/include/faabric/transport/MessageEndpoint.h b/include/faabric/transport/MessageEndpoint.h index 51f6c5bfd..c1852f89d 100644 --- a/include/faabric/transport/MessageEndpoint.h +++ b/include/faabric/transport/MessageEndpoint.h @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -74,7 +75,6 @@ class MessageEndpoint std::thread::id tid; int id; - private: int recvTimeoutMs = DEFAULT_RECV_TIMEOUT_MS; int sendTimeoutMs = DEFAULT_SEND_TIMEOUT_MS; @@ -102,4 +102,12 @@ class RecvMessageEndpoint : public MessageEndpoint void close(); }; + +class MessageTimeoutException : public faabric::util::FaabricException +{ + public: + explicit MessageTimeoutException(std::string message) + : FaabricException(std::move(message)) + {} +}; } diff --git a/src/transport/MessageEndpoint.cpp b/src/transport/MessageEndpoint.cpp index 33dd61890..b59b4d1ec 100644 --- a/src/transport/MessageEndpoint.cpp +++ b/src/transport/MessageEndpoint.cpp @@ -137,6 +137,12 @@ Message MessageEndpoint::recv(int size) try { auto res = this->socket->recv(zmq::buffer(msg.udata(), msg.size())); + + if (!res.has_value()) { + SPDLOG_ERROR("Timed out receiving message of size {}", size); + throw MessageTimeoutException("Timed out receiving message"); + } + if (res.has_value() && (res->size != res->untruncated_size)) { SPDLOG_ERROR("Received more bytes than buffer can hold. " "Received: {}, capacity {}", @@ -149,10 +155,11 @@ Message MessageEndpoint::recv(int size) // Return empty message to signify termination SPDLOG_TRACE("Shutting endpoint down after receiving ETERM"); return Message(); - } else { - SPDLOG_ERROR("Error receiving message: {}", e.what()); - throw; } + + // Print default message and rethrow + SPDLOG_ERROR("Error receiving message: {} ({})", e.num(), e.what()); + throw; } return msg; @@ -163,8 +170,8 @@ Message MessageEndpoint::recv(int size) try { auto res = this->socket->recv(msg); if (!res.has_value()) { - SPDLOG_ERROR("Error receiving message: EAGAIN"); - throw std::runtime_error("Error receiving message"); + SPDLOG_ERROR("Timed out receiving message with no size"); + throw MessageTimeoutException("Timed out receiving message"); } } catch (zmq::error_t& e) { if (e.num() == ZMQ_ETERM) { diff --git a/src/transport/MessageEndpointClient.cpp b/src/transport/MessageEndpointClient.cpp index 2cd751bc7..cdd2461f9 100644 --- a/src/transport/MessageEndpointClient.cpp +++ b/src/transport/MessageEndpointClient.cpp @@ -11,6 +11,11 @@ Message MessageEndpointClient::awaitResponse(int port) // Wait for the response, open a temporary endpoint for it // Note - we use a different host/port not to clash with existing server RecvMessageEndpoint endpoint(port); + + // Inherit timeouts on temporary endpoint + endpoint.setRecvTimeoutMs(recvTimeoutMs); + endpoint.setSendTimeoutMs(sendTimeoutMs); + endpoint.open(faabric::transport::getGlobalMessageContext()); Message receivedMessage = endpoint.recv(); endpoint.close(); diff --git a/tests/test/transport/test_message_server.cpp b/tests/test/transport/test_message_server.cpp index 0e59fa1d5..79bbaf65b 100644 --- a/tests/test/transport/test_message_server.cpp +++ b/tests/test/transport/test_message_server.cpp @@ -42,6 +42,29 @@ class DummyServer final : public MessageEndpointServer } }; +class SlowServer final : public MessageEndpointServer +{ + public: + int delayMs = 1000; + + SlowServer() + : MessageEndpointServer(testPort) + {} + + void sendResponse(uint8_t* serialisedMsg, + int size, + const std::string& returnHost, + int returnPort) + { + usleep(delayMs * 1000); + } + + private: + void doRecv(faabric::transport::Message& header, + faabric::transport::Message& body) override + {} +}; + namespace tests { TEST_CASE("Test start/stop server", "[transport]") { @@ -163,4 +186,56 @@ TEST_CASE("Test multiple clients talking to one server", "[transport]") server.stop(); } + +TEST_CASE("Test client timeout on requests to valid server", "[transport]") +{ + // Start the server in the background + std::thread t([] { + SlowServer server; + server.start(); + + int threadSleep = server.delayMs + 500; + usleep(threadSleep * 1000); + + server.stop(); + }); + + // Wait for the server to start up + usleep(500 * 1000); + + // Set up the client + auto& context = getGlobalMessageContext(); + MessageEndpointClient cli(thisHost, testPort); + + int clientTimeout; + bool expectFailure; + SECTION("Long timeout no failure") + { + clientTimeout = 20000; + expectFailure = false; + } + + SECTION("Short timeout failure") + { + clientTimeout = 100; + expectFailure = true; + } + + cli.setRecvTimeoutMs(clientTimeout); + cli.open(context); + + // Check for failure accordingly + if (expectFailure) { + REQUIRE_THROWS_AS(cli.awaitResponse(testPort + REPLY_PORT_OFFSET), + MessageTimeoutException); + } else { + REQUIRE_NOTHROW(cli.awaitResponse(testPort + REPLY_PORT_OFFSET)); + } + + cli.close(); + + if (t.joinable()) { + t.join(); + } +} } From 03ed5719a799d7ea6fe84802ae6920876f7c804a Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Thu, 17 Jun 2021 15:58:21 +0000 Subject: [PATCH 3/3] Improve logging in other failure case --- src/transport/MessageEndpoint.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/MessageEndpoint.cpp b/src/transport/MessageEndpoint.cpp index b59b4d1ec..07059f87d 100644 --- a/src/transport/MessageEndpoint.cpp +++ b/src/transport/MessageEndpoint.cpp @@ -179,7 +179,7 @@ Message MessageEndpoint::recv(int size) SPDLOG_TRACE("Shutting endpoint down after receiving ETERM"); return Message(); } else { - SPDLOG_ERROR("Error receiving message: {}", e.what()); + SPDLOG_ERROR("Error receiving message: {} ({})", e.num(), e.what()); throw; } }