Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add send/ recv timeouts to message endpoints #119

Merged
merged 4 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions include/faabric/transport/MessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <faabric/transport/Message.h>
#include <faabric/transport/MessageContext.h>
#include <faabric/util/exception.h>

#include <thread>
#include <zmq.hpp>
Expand All @@ -13,6 +14,11 @@

#define ANY_HOST "0.0.0.0"

// These timeouts should be long enough to permit sending and receiving large
// messages, but short enough not to hang around when something has gone wrong.
#define DEFAULT_RECV_TIMEOUT_MS 20000
#define DEFAULT_SEND_TIMEOUT_MS 20000

namespace faabric::transport {
enum class SocketType
{
Expand Down Expand Up @@ -59,11 +65,20 @@ class MessageEndpoint

int getPort();

void setRecvTimeoutMs(int value);

void setSendTimeoutMs(int value);

protected:
const std::string host;
const int port;
std::thread::id tid;
int id;

int recvTimeoutMs = DEFAULT_RECV_TIMEOUT_MS;
int sendTimeoutMs = DEFAULT_SEND_TIMEOUT_MS;

void validateTimeout(int value);
Copy link
Collaborator Author

@Shillaker Shillaker Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These have to be protected because of the subclassing that goes on (otherwise they'd be private).

};

/* Send and Recv Message Endpoints */
Expand All @@ -87,4 +102,12 @@ class RecvMessageEndpoint : public MessageEndpoint

void close();
};

class MessageTimeoutException : public faabric::util::FaabricException
{
public:
explicit MessageTimeoutException(std::string message)
: FaabricException(std::move(message))
{}
};
}
49 changes: 43 additions & 6 deletions src/transport/MessageEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void MessageEndpoint::open(faabric::transport::MessageContext& context,
e.what());
throw;
}

break;
default:
throw std::runtime_error("Unrecognized socket type");
Expand All @@ -78,6 +79,10 @@ void MessageEndpoint::open(faabric::transport::MessageContext& context,
throw;
}
}

// Set socket options
this->socket->setsockopt(ZMQ_RCVTIMEO, recvTimeoutMs);
this->socket->setsockopt(ZMQ_SNDTIMEO, sendTimeoutMs);
}

void MessageEndpoint::send(uint8_t* serialisedMsg, size_t msgSize, bool more)
Expand Down Expand Up @@ -132,6 +137,12 @@ Message MessageEndpoint::recv(int size)

try {
auto res = this->socket->recv(zmq::buffer(msg.udata(), msg.size()));

if (!res.has_value()) {
SPDLOG_ERROR("Timed out receiving message of size {}", size);
throw MessageTimeoutException("Timed out receiving message");
}

if (res.has_value() && (res->size != res->untruncated_size)) {
SPDLOG_ERROR("Received more bytes than buffer can hold. "
"Received: {}, capacity {}",
Expand All @@ -144,10 +155,11 @@ Message MessageEndpoint::recv(int size)
// Return empty message to signify termination
SPDLOG_TRACE("Shutting endpoint down after receiving ETERM");
return Message();
} else {
SPDLOG_ERROR("Error receiving message: {}", e.what());
throw;
}

// Print default message and rethrow
SPDLOG_ERROR("Error receiving message: {} ({})", e.num(), e.what());
throw;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message was missing the error code before.

}

return msg;
Expand All @@ -158,16 +170,16 @@ Message MessageEndpoint::recv(int size)
try {
auto res = this->socket->recv(msg);
if (!res.has_value()) {
SPDLOG_ERROR("Error receiving message: EAGAIN");
throw std::runtime_error("Error receiving message");
SPDLOG_ERROR("Timed out receiving message with no size");
throw MessageTimeoutException("Timed out receiving message");
}
} catch (zmq::error_t& e) {
if (e.num() == ZMQ_ETERM) {
// Return empty message to signify termination
SPDLOG_TRACE("Shutting endpoint down after receiving ETERM");
return Message();
} else {
SPDLOG_ERROR("Error receiving message: {}", e.what());
SPDLOG_ERROR("Error receiving message: {} ({})", e.num(), e.what());
throw;
}
}
Expand Down Expand Up @@ -253,6 +265,31 @@ int MessageEndpoint::getPort()
return port;
}

void MessageEndpoint::validateTimeout(int value)
{
if (value <= 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how would we specify an infinte timeout? Do we want to support that at all? I would expect a negative timeout to not timeout at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not support infinite timeouts anywhere. I can't think of a case where it's justified.

SPDLOG_ERROR("Setting invalid timeout of {}", value);
throw std::runtime_error("Setting invalid timeout");
}

if (socket != nullptr) {
SPDLOG_ERROR("Setting timeout of {} after socket created", value);
throw std::runtime_error("Setting timeout after socket created");
}
}

void MessageEndpoint::setRecvTimeoutMs(int value)
{
validateTimeout(value);
recvTimeoutMs = value;
}

void MessageEndpoint::setSendTimeoutMs(int value)
{
validateTimeout(value);
sendTimeoutMs = value;
}

/* Send and Recv Message Endpoints */

SendMessageEndpoint::SendMessageEndpoint(const std::string& hostIn, int portIn)
Expand Down
5 changes: 5 additions & 0 deletions src/transport/MessageEndpointClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ Message MessageEndpointClient::awaitResponse(int port)
// Wait for the response, open a temporary endpoint for it
// Note - we use a different host/port not to clash with existing server
RecvMessageEndpoint endpoint(port);

// Inherit timeouts on temporary endpoint
endpoint.setRecvTimeoutMs(recvTimeoutMs);
endpoint.setSendTimeoutMs(sendTimeoutMs);

endpoint.open(faabric::transport::getGlobalMessageContext());
Message receivedMessage = endpoint.recv();
endpoint.close();
Expand Down
39 changes: 39 additions & 0 deletions tests/test/transport/test_message_endpoint_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,43 @@ TEST_CASE_METHOD(MessageContextFixture,
// Close the destination endpoint
dst.close();
}

TEST_CASE_METHOD(MessageContextFixture,
"Test can't set invalid send/recv timeouts",
"[transport]")
{
MessageEndpoint cli(thisHost, testPort);

SECTION("Sanity check valid timeout")
{
REQUIRE_NOTHROW(cli.setRecvTimeoutMs(100));
REQUIRE_NOTHROW(cli.setSendTimeoutMs(100));
}

SECTION("Recv zero timeout") { REQUIRE_THROWS(cli.setRecvTimeoutMs(0)); }

SECTION("Send zero timeout") { REQUIRE_THROWS(cli.setSendTimeoutMs(0)); }

SECTION("Recv negative timeout")
{
REQUIRE_THROWS(cli.setRecvTimeoutMs(-1));
}

SECTION("Send negative timeout")
{
REQUIRE_THROWS(cli.setSendTimeoutMs(-1));
}

SECTION("Recv, socket already initialised")
{
cli.open(context, SocketType::PULL, false);
REQUIRE_THROWS(cli.setRecvTimeoutMs(100));
}

SECTION("Send, socket already initialised")
{
cli.open(context, SocketType::PULL, false);
REQUIRE_THROWS(cli.setSendTimeoutMs(100));
}
}
}
75 changes: 75 additions & 0 deletions tests/test/transport/test_message_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ class DummyServer final : public MessageEndpointServer
}
};

class SlowServer final : public MessageEndpointServer
{
public:
int delayMs = 1000;

SlowServer()
: MessageEndpointServer(testPort)
{}

void sendResponse(uint8_t* serialisedMsg,
int size,
const std::string& returnHost,
int returnPort)
{
usleep(delayMs * 1000);
}

private:
void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override
{}
};

namespace tests {
TEST_CASE("Test start/stop server", "[transport]")
{
Expand Down Expand Up @@ -163,4 +186,56 @@ TEST_CASE("Test multiple clients talking to one server", "[transport]")

server.stop();
}

TEST_CASE("Test client timeout on requests to valid server", "[transport]")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is the right place to put this test. It requires a custom server, but is really testing functionality in the client.

{
// Start the server in the background
std::thread t([] {
SlowServer server;
server.start();

int threadSleep = server.delayMs + 500;
usleep(threadSleep * 1000);

server.stop();
});

// Wait for the server to start up
usleep(500 * 1000);

// Set up the client
auto& context = getGlobalMessageContext();
MessageEndpointClient cli(thisHost, testPort);

int clientTimeout;
bool expectFailure;
SECTION("Long timeout no failure")
{
clientTimeout = 20000;
expectFailure = false;
}

SECTION("Short timeout failure")
{
clientTimeout = 100;
expectFailure = true;
}

cli.setRecvTimeoutMs(clientTimeout);
cli.open(context);

// Check for failure accordingly
if (expectFailure) {
REQUIRE_THROWS_AS(cli.awaitResponse(testPort + REPLY_PORT_OFFSET),
MessageTimeoutException);
} else {
REQUIRE_NOTHROW(cli.awaitResponse(testPort + REPLY_PORT_OFFSET));
}

cli.close();

if (t.joinable()) {
t.join();
}
}
}