Skip to content

Commit

Permalink
Add send/ recv timeouts to message endpoints (#119)
Browse files Browse the repository at this point in the history
* Adding timeouts to message endpoints

* Add tests for timeouts

* Improve logging in other failure case
  • Loading branch information
Shillaker committed Jun 18, 2021
1 parent 646dd85 commit 12796a1
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 6 deletions.
23 changes: 23 additions & 0 deletions include/faabric/transport/MessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <faabric/transport/Message.h>
#include <faabric/transport/MessageContext.h>
#include <faabric/util/exception.h>

#include <thread>
#include <zmq.hpp>
Expand All @@ -13,6 +14,11 @@

#define ANY_HOST "0.0.0.0"

// These timeouts should be long enough to permit sending and receiving large
// messages, but short enough not to hang around when something has gone wrong.
#define DEFAULT_RECV_TIMEOUT_MS 20000
#define DEFAULT_SEND_TIMEOUT_MS 20000

namespace faabric::transport {
enum class SocketType
{
Expand Down Expand Up @@ -59,11 +65,20 @@ class MessageEndpoint

int getPort();

void setRecvTimeoutMs(int value);

void setSendTimeoutMs(int value);

protected:
const std::string host;
const int port;
std::thread::id tid;
int id;

int recvTimeoutMs = DEFAULT_RECV_TIMEOUT_MS;
int sendTimeoutMs = DEFAULT_SEND_TIMEOUT_MS;

void validateTimeout(int value);
};

/* Send and Recv Message Endpoints */
Expand All @@ -87,4 +102,12 @@ class RecvMessageEndpoint : public MessageEndpoint

void close();
};

class MessageTimeoutException : public faabric::util::FaabricException
{
public:
explicit MessageTimeoutException(std::string message)
: FaabricException(std::move(message))
{}
};
}
49 changes: 43 additions & 6 deletions src/transport/MessageEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void MessageEndpoint::open(faabric::transport::MessageContext& context,
e.what());
throw;
}

break;
default:
throw std::runtime_error("Unrecognized socket type");
Expand All @@ -78,6 +79,10 @@ void MessageEndpoint::open(faabric::transport::MessageContext& context,
throw;
}
}

// Set socket options
this->socket->setsockopt(ZMQ_RCVTIMEO, recvTimeoutMs);
this->socket->setsockopt(ZMQ_SNDTIMEO, sendTimeoutMs);
}

void MessageEndpoint::send(uint8_t* serialisedMsg, size_t msgSize, bool more)
Expand Down Expand Up @@ -132,6 +137,12 @@ Message MessageEndpoint::recv(int size)

try {
auto res = this->socket->recv(zmq::buffer(msg.udata(), msg.size()));

if (!res.has_value()) {
SPDLOG_ERROR("Timed out receiving message of size {}", size);
throw MessageTimeoutException("Timed out receiving message");
}

if (res.has_value() && (res->size != res->untruncated_size)) {
SPDLOG_ERROR("Received more bytes than buffer can hold. "
"Received: {}, capacity {}",
Expand All @@ -144,10 +155,11 @@ Message MessageEndpoint::recv(int size)
// Return empty message to signify termination
SPDLOG_TRACE("Shutting endpoint down after receiving ETERM");
return Message();
} else {
SPDLOG_ERROR("Error receiving message: {}", e.what());
throw;
}

// Print default message and rethrow
SPDLOG_ERROR("Error receiving message: {} ({})", e.num(), e.what());
throw;
}

return msg;
Expand All @@ -158,16 +170,16 @@ Message MessageEndpoint::recv(int size)
try {
auto res = this->socket->recv(msg);
if (!res.has_value()) {
SPDLOG_ERROR("Error receiving message: EAGAIN");
throw std::runtime_error("Error receiving message");
SPDLOG_ERROR("Timed out receiving message with no size");
throw MessageTimeoutException("Timed out receiving message");
}
} catch (zmq::error_t& e) {
if (e.num() == ZMQ_ETERM) {
// Return empty message to signify termination
SPDLOG_TRACE("Shutting endpoint down after receiving ETERM");
return Message();
} else {
SPDLOG_ERROR("Error receiving message: {}", e.what());
SPDLOG_ERROR("Error receiving message: {} ({})", e.num(), e.what());
throw;
}
}
Expand Down Expand Up @@ -253,6 +265,31 @@ int MessageEndpoint::getPort()
return port;
}

void MessageEndpoint::validateTimeout(int value)
{
if (value <= 0) {
SPDLOG_ERROR("Setting invalid timeout of {}", value);
throw std::runtime_error("Setting invalid timeout");
}

if (socket != nullptr) {
SPDLOG_ERROR("Setting timeout of {} after socket created", value);
throw std::runtime_error("Setting timeout after socket created");
}
}

void MessageEndpoint::setRecvTimeoutMs(int value)
{
validateTimeout(value);
recvTimeoutMs = value;
}

void MessageEndpoint::setSendTimeoutMs(int value)
{
validateTimeout(value);
sendTimeoutMs = value;
}

/* Send and Recv Message Endpoints */

SendMessageEndpoint::SendMessageEndpoint(const std::string& hostIn, int portIn)
Expand Down
5 changes: 5 additions & 0 deletions src/transport/MessageEndpointClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ Message MessageEndpointClient::awaitResponse(int port)
// Wait for the response, open a temporary endpoint for it
// Note - we use a different host/port not to clash with existing server
RecvMessageEndpoint endpoint(port);

// Inherit timeouts on temporary endpoint
endpoint.setRecvTimeoutMs(recvTimeoutMs);
endpoint.setSendTimeoutMs(sendTimeoutMs);

endpoint.open(faabric::transport::getGlobalMessageContext());
Message receivedMessage = endpoint.recv();
endpoint.close();
Expand Down
39 changes: 39 additions & 0 deletions tests/test/transport/test_message_endpoint_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,43 @@ TEST_CASE_METHOD(MessageContextFixture,
// Close the destination endpoint
dst.close();
}

TEST_CASE_METHOD(MessageContextFixture,
"Test can't set invalid send/recv timeouts",
"[transport]")
{
MessageEndpoint cli(thisHost, testPort);

SECTION("Sanity check valid timeout")
{
REQUIRE_NOTHROW(cli.setRecvTimeoutMs(100));
REQUIRE_NOTHROW(cli.setSendTimeoutMs(100));
}

SECTION("Recv zero timeout") { REQUIRE_THROWS(cli.setRecvTimeoutMs(0)); }

SECTION("Send zero timeout") { REQUIRE_THROWS(cli.setSendTimeoutMs(0)); }

SECTION("Recv negative timeout")
{
REQUIRE_THROWS(cli.setRecvTimeoutMs(-1));
}

SECTION("Send negative timeout")
{
REQUIRE_THROWS(cli.setSendTimeoutMs(-1));
}

SECTION("Recv, socket already initialised")
{
cli.open(context, SocketType::PULL, false);
REQUIRE_THROWS(cli.setRecvTimeoutMs(100));
}

SECTION("Send, socket already initialised")
{
cli.open(context, SocketType::PULL, false);
REQUIRE_THROWS(cli.setSendTimeoutMs(100));
}
}
}
75 changes: 75 additions & 0 deletions tests/test/transport/test_message_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ class DummyServer final : public MessageEndpointServer
}
};

class SlowServer final : public MessageEndpointServer
{
public:
int delayMs = 1000;

SlowServer()
: MessageEndpointServer(testPort)
{}

void sendResponse(uint8_t* serialisedMsg,
int size,
const std::string& returnHost,
int returnPort)
{
usleep(delayMs * 1000);
}

private:
void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override
{}
};

namespace tests {
TEST_CASE("Test start/stop server", "[transport]")
{
Expand Down Expand Up @@ -163,4 +186,56 @@ TEST_CASE("Test multiple clients talking to one server", "[transport]")

server.stop();
}

TEST_CASE("Test client timeout on requests to valid server", "[transport]")
{
// Start the server in the background
std::thread t([] {
SlowServer server;
server.start();

int threadSleep = server.delayMs + 500;
usleep(threadSleep * 1000);

server.stop();
});

// Wait for the server to start up
usleep(500 * 1000);

// Set up the client
auto& context = getGlobalMessageContext();
MessageEndpointClient cli(thisHost, testPort);

int clientTimeout;
bool expectFailure;
SECTION("Long timeout no failure")
{
clientTimeout = 20000;
expectFailure = false;
}

SECTION("Short timeout failure")
{
clientTimeout = 100;
expectFailure = true;
}

cli.setRecvTimeoutMs(clientTimeout);
cli.open(context);

// Check for failure accordingly
if (expectFailure) {
REQUIRE_THROWS_AS(cli.awaitResponse(testPort + REPLY_PORT_OFFSET),
MessageTimeoutException);
} else {
REQUIRE_NOTHROW(cli.awaitResponse(testPort + REPLY_PORT_OFFSET));
}

cli.close();

if (t.joinable()) {
t.join();
}
}
}

0 comments on commit 12796a1

Please sign in to comment.