-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add send/ recv timeouts to message endpoints #119
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,7 @@ void MessageEndpoint::open(faabric::transport::MessageContext& context, | |
e.what()); | ||
throw; | ||
} | ||
|
||
break; | ||
default: | ||
throw std::runtime_error("Unrecognized socket type"); | ||
|
@@ -78,6 +79,10 @@ void MessageEndpoint::open(faabric::transport::MessageContext& context, | |
throw; | ||
} | ||
} | ||
|
||
// Set socket options | ||
this->socket->setsockopt(ZMQ_RCVTIMEO, recvTimeoutMs); | ||
this->socket->setsockopt(ZMQ_SNDTIMEO, sendTimeoutMs); | ||
} | ||
|
||
void MessageEndpoint::send(uint8_t* serialisedMsg, size_t msgSize, bool more) | ||
|
@@ -132,6 +137,12 @@ Message MessageEndpoint::recv(int size) | |
|
||
try { | ||
auto res = this->socket->recv(zmq::buffer(msg.udata(), msg.size())); | ||
|
||
if (!res.has_value()) { | ||
SPDLOG_ERROR("Timed out receiving message of size {}", size); | ||
throw MessageTimeoutException("Timed out receiving message"); | ||
} | ||
|
||
if (res.has_value() && (res->size != res->untruncated_size)) { | ||
SPDLOG_ERROR("Received more bytes than buffer can hold. " | ||
"Received: {}, capacity {}", | ||
|
@@ -144,10 +155,11 @@ Message MessageEndpoint::recv(int size) | |
// Return empty message to signify termination | ||
SPDLOG_TRACE("Shutting endpoint down after receiving ETERM"); | ||
return Message(); | ||
} else { | ||
SPDLOG_ERROR("Error receiving message: {}", e.what()); | ||
throw; | ||
} | ||
|
||
// Print default message and rethrow | ||
SPDLOG_ERROR("Error receiving message: {} ({})", e.num(), e.what()); | ||
throw; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This message was missing the error code before. |
||
} | ||
|
||
return msg; | ||
|
@@ -158,16 +170,16 @@ Message MessageEndpoint::recv(int size) | |
try { | ||
auto res = this->socket->recv(msg); | ||
if (!res.has_value()) { | ||
SPDLOG_ERROR("Error receiving message: EAGAIN"); | ||
throw std::runtime_error("Error receiving message"); | ||
SPDLOG_ERROR("Timed out receiving message with no size"); | ||
throw MessageTimeoutException("Timed out receiving message"); | ||
} | ||
} catch (zmq::error_t& e) { | ||
if (e.num() == ZMQ_ETERM) { | ||
// Return empty message to signify termination | ||
SPDLOG_TRACE("Shutting endpoint down after receiving ETERM"); | ||
return Message(); | ||
} else { | ||
SPDLOG_ERROR("Error receiving message: {}", e.what()); | ||
SPDLOG_ERROR("Error receiving message: {} ({})", e.num(), e.what()); | ||
throw; | ||
} | ||
} | ||
|
@@ -253,6 +265,31 @@ int MessageEndpoint::getPort() | |
return port; | ||
} | ||
|
||
void MessageEndpoint::validateTimeout(int value) | ||
{ | ||
if (value <= 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So how would we specify an infinte timeout? Do we want to support that at all? I would expect a negative timeout to not timeout at all. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should not support infinite timeouts anywhere. I can't think of a case where it's justified. |
||
SPDLOG_ERROR("Setting invalid timeout of {}", value); | ||
throw std::runtime_error("Setting invalid timeout"); | ||
} | ||
|
||
if (socket != nullptr) { | ||
SPDLOG_ERROR("Setting timeout of {} after socket created", value); | ||
throw std::runtime_error("Setting timeout after socket created"); | ||
} | ||
} | ||
|
||
void MessageEndpoint::setRecvTimeoutMs(int value) | ||
{ | ||
validateTimeout(value); | ||
recvTimeoutMs = value; | ||
} | ||
|
||
void MessageEndpoint::setSendTimeoutMs(int value) | ||
{ | ||
validateTimeout(value); | ||
sendTimeoutMs = value; | ||
} | ||
|
||
/* Send and Recv Message Endpoints */ | ||
|
||
SendMessageEndpoint::SendMessageEndpoint(const std::string& hostIn, int portIn) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,29 @@ class DummyServer final : public MessageEndpointServer | |
} | ||
}; | ||
|
||
class SlowServer final : public MessageEndpointServer | ||
{ | ||
public: | ||
int delayMs = 1000; | ||
|
||
SlowServer() | ||
: MessageEndpointServer(testPort) | ||
{} | ||
|
||
void sendResponse(uint8_t* serialisedMsg, | ||
int size, | ||
const std::string& returnHost, | ||
int returnPort) | ||
{ | ||
usleep(delayMs * 1000); | ||
} | ||
|
||
private: | ||
void doRecv(faabric::transport::Message& header, | ||
faabric::transport::Message& body) override | ||
{} | ||
}; | ||
|
||
namespace tests { | ||
TEST_CASE("Test start/stop server", "[transport]") | ||
{ | ||
|
@@ -163,4 +186,56 @@ TEST_CASE("Test multiple clients talking to one server", "[transport]") | |
|
||
server.stop(); | ||
} | ||
|
||
TEST_CASE("Test client timeout on requests to valid server", "[transport]") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if this is the right place to put this test. It requires a custom server, but is really testing functionality in the client. |
||
{ | ||
// Start the server in the background | ||
std::thread t([] { | ||
SlowServer server; | ||
server.start(); | ||
|
||
int threadSleep = server.delayMs + 500; | ||
usleep(threadSleep * 1000); | ||
|
||
server.stop(); | ||
}); | ||
|
||
// Wait for the server to start up | ||
usleep(500 * 1000); | ||
|
||
// Set up the client | ||
auto& context = getGlobalMessageContext(); | ||
MessageEndpointClient cli(thisHost, testPort); | ||
|
||
int clientTimeout; | ||
bool expectFailure; | ||
SECTION("Long timeout no failure") | ||
{ | ||
clientTimeout = 20000; | ||
expectFailure = false; | ||
} | ||
|
||
SECTION("Short timeout failure") | ||
{ | ||
clientTimeout = 100; | ||
expectFailure = true; | ||
} | ||
|
||
cli.setRecvTimeoutMs(clientTimeout); | ||
cli.open(context); | ||
|
||
// Check for failure accordingly | ||
if (expectFailure) { | ||
REQUIRE_THROWS_AS(cli.awaitResponse(testPort + REPLY_PORT_OFFSET), | ||
MessageTimeoutException); | ||
} else { | ||
REQUIRE_NOTHROW(cli.awaitResponse(testPort + REPLY_PORT_OFFSET)); | ||
} | ||
|
||
cli.close(); | ||
|
||
if (t.joinable()) { | ||
t.join(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These have to be
protected
because of the subclassing that goes on (otherwise they'd be private).