Skip to content

Commit

Permalink
Merge pull request #1758
Browse files Browse the repository at this point in the history
Fix request timeout handling for the new mail API
  • Loading branch information
Neverlord committed Feb 16, 2024
2 parents 80ae89b + d7d05fc commit d28e576
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 56 deletions.
46 changes: 32 additions & 14 deletions libcaf_core/caf/event_based_mail.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class event_based_scheduled_mail_t
/// Sends the message to `receiver` as a request message and returns a handle
/// for processing the response.
/// @param receiver The actor that should receive the message.
/// @param relative_timeout The maximum time to wait for a response.
/// @param ref_tag Either `strong_ref` or `weak_ref`. When passing
/// `strong_ref`, the system will keep a strong reference to
/// the receiver until the message has been delivered.
Expand All @@ -42,27 +43,35 @@ class event_based_scheduled_mail_t
/// the meantime.
template <class Handle, class RefTag = strong_ref_t,
class SelfRefTag = strong_self_ref_t>
[[nodiscard]] auto request(const Handle& receiver, RefTag ref_tag = {},
[[nodiscard]] auto request(const Handle& receiver, timespan relative_timeout,
RefTag ref_tag = {},
SelfRefTag self_ref_tag = {}) && {
detail::send_type_check<typename Trait::signatures, Handle, Args...>();
using response_type = response_type_t<typename Handle::signatures, Args...>;
auto mid = self()->new_request_id(Priority);
disposable in_flight;
disposable in_flight_response;
disposable in_flight_timeout;
if (receiver) {
auto& clock = self()->clock();
in_flight = clock.schedule_message(actor_cast(super::self_, self_ref_tag),
actor_cast(receiver, ref_tag),
super::timeout_, mid,
std::move(super::content_));
if (relative_timeout != infinite) {
in_flight_timeout = clock.schedule_message(
nullptr, actor_cast(super::self_, weak_ref),
super::timeout_ + relative_timeout, mid.response_id(),
make_message(make_error(sec::request_timeout)));
}
in_flight_response
= clock.schedule_message(actor_cast(super::self_, self_ref_tag),
actor_cast(receiver, ref_tag), super::timeout_,
mid, std::move(super::content_));

} else {
self()->enqueue(make_mailbox_element(self()->ctrl(), mid.response_id(),
make_error(sec::invalid_request)),
self()->context());
}
auto hdl = event_based_response_handle<response_type>{self(),
mid.response_id()};
return std::pair{std::move(hdl), std::move(in_flight)};
using hdl_t = event_based_response_handle<response_type>;
auto hdl = hdl_t{self(), mid.response_id(), std::move(in_flight_timeout)};
return std::pair{std::move(hdl), std::move(in_flight_response)};
}

private:
Expand Down Expand Up @@ -98,19 +107,28 @@ class event_based_mail_t : public async_mail_base_t<Priority, Trait, Args...> {

/// Schedules the message for delivery with a relative timeout.
[[nodiscard]] auto delay(actor_clock::duration_type timeout) && {
using clock = actor_clock::clock_type;
using result_t = event_based_scheduled_mail_t<Priority, Trait, Args...>;
return result_t{self(), std::move(super::content_), clock::now() + timeout};
return result_t{self(), std::move(super::content_),
self()->clock().now() + timeout};
}

/// Sends the message to `receiver` as a request message and returns a handle
/// for processing the response.
template <class Handle>
[[nodiscard]] auto request(const Handle& receiver) && {
[[nodiscard]] auto
request(const Handle& receiver, timespan relative_timeout) && {
detail::send_type_check<typename Trait::signatures, Handle, Args...>();
using response_type = response_type_t<typename Handle::signatures, Args...>;
auto mid = self()->new_request_id(Priority);
disposable in_flight_timeout;
if (receiver) {
if (relative_timeout != infinite) {
auto& clock = self()->clock();
in_flight_timeout = clock.schedule_message(
nullptr, actor_cast(super::self_, weak_ref),
clock.now() + relative_timeout, mid.response_id(),
make_message(make_error(sec::request_timeout)));
}
auto* ptr = actor_cast<abstract_actor*>(receiver);
ptr->enqueue(make_mailbox_element(self()->ctrl(), mid,
std::move(super::content_)),
Expand All @@ -120,8 +138,8 @@ class event_based_mail_t : public async_mail_base_t<Priority, Trait, Args...> {
make_error(sec::invalid_request)),
self()->context());
}
return event_based_response_handle<response_type>{self(),
mid.response_id()};
using hdl_t = event_based_response_handle<response_type>;
return hdl_t{self(), mid.response_id(), std::move(in_flight_timeout)};
}

private:
Expand Down
130 changes: 115 additions & 15 deletions libcaf_core/caf/event_based_mail.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "caf/dynamically_typed.hpp"
#include "caf/event_based_actor.hpp"
#include "caf/timespan.hpp"
#include "caf/typed_event_based_actor.hpp"

// Note: functions inherited from async_mail are tested in async_mail.test.cpp.
Expand Down Expand Up @@ -36,7 +37,9 @@ TEST("send request message") {
});
auto result = std::make_shared<int>(0);
SECTION("regular message") {
self->mail(3).request(dummy).then([result](int x) { *result = x; });
self->mail(3)
.request(dummy, infinite) //
.then([result](int x) { *result = x; });
launch();
expect<int>()
.with(3)
Expand All @@ -53,7 +56,7 @@ TEST("send request message") {
SECTION("urgent message") {
self->mail(3)
.urgent()
.request(dummy) //
.request(dummy, infinite) //
.then([result](int x) { *result = x; });
launch();
expect<int>()
Expand All @@ -77,7 +80,7 @@ TEST("send request message") {
});
auto result = std::make_shared<error>();
self->mail(3)
.request(dummy) //
.request(dummy, infinite) //
.then([this](int value) { fail("expected a string, got: {}", value); },
[result](error& err) { *result = std::move(err); });
launch();
Expand All @@ -95,14 +98,44 @@ TEST("send request message") {
SECTION("invalid receiver") {
auto result = std::make_shared<error>();
self->mail(3)
.request(actor{}) //
.request(actor{}, 1s) //
.then([this](int value) { fail("expected a string, got: {}", value); },
[result](error& err) { *result = std::move(err); });
check_eq(mail_count(), 1u);
launch();
expect<error>().to(self_hdl);
check_eq(*result, make_error(sec::invalid_request));
}
SECTION("no response") {
auto result = std::make_shared<error>();
auto dummy = sys.spawn([](event_based_actor* self) -> behavior {
auto res = std::make_shared<response_promise>();
return {
[self, res](int) { *res = self->make_response_promise(); },
};
});
self->mail(3)
.request(dummy, 1s) //
.then([this](int) { fail("unexpected response"); },
[result](error& err) { *result = std::move(err); });
launch();
check_eq(mail_count(), 1u);
check_eq(num_timeouts(), 1u);
expect<int>()
.with(3)
.priority(message_priority::normal)
.from(self_hdl)
.to(dummy);
check_eq(mail_count(), 0u);
check_eq(num_timeouts(), 1u);
trigger_timeout();
expect<error>().with(make_error(sec::request_timeout)).to(self_hdl);
check_eq(*result, make_error(sec::request_timeout));
check_eq(mail_count(), 0u);
check_eq(num_timeouts(), 0u);
self->mail(exit_msg{nullptr, exit_reason::user_shutdown}).send(dummy);
expect<exit_msg>().to(dummy);
}
}
SECTION("using .await for the response") {
SECTION("valid response") {
Expand All @@ -113,7 +146,9 @@ TEST("send request message") {
});
auto result = std::make_shared<int>(0);
SECTION("regular message") {
self->mail(3).request(dummy).await([result](int x) { *result = x; });
self->mail(3)
.request(dummy, infinite) //
.await([result](int x) { *result = x; });
launch();
expect<int>()
.with(3)
Expand All @@ -130,7 +165,7 @@ TEST("send request message") {
SECTION("urgent message") {
self->mail(3)
.urgent()
.request(dummy) //
.request(dummy, infinite) //
.await([result](int x) { *result = x; });
launch();
expect<int>()
Expand All @@ -154,7 +189,7 @@ TEST("send request message") {
});
auto result = std::make_shared<error>();
self->mail(3)
.request(dummy) //
.request(dummy, infinite) //
.await([this](int value) { fail("expected a string, got: {}", value); },
[result](error& err) { *result = std::move(err); });
launch();
Expand All @@ -172,14 +207,40 @@ TEST("send request message") {
SECTION("invalid receiver") {
auto result = std::make_shared<error>();
self->mail(3)
.request(actor{}) //
.request(actor{}, 1s) //
.await([this](int value) { fail("expected a string, got: {}", value); },
[result](error& err) { *result = std::move(err); });
check_eq(mail_count(), 1u);
launch();
expect<error>().to(self_hdl);
check_eq(*result, make_error(sec::invalid_request));
}
SECTION("no response") {
auto result = std::make_shared<error>();
auto dummy = sys.spawn([](event_based_actor* self) -> behavior {
auto res = std::make_shared<response_promise>();
return {
[self, res](int) { *res = self->make_response_promise(); },
};
});
self->mail(3)
.request(dummy, 1s) //
.await([this](int) { fail("unexpected response"); },
[result](error& err) { *result = std::move(err); });
launch();
expect<int>()
.with(3)
.priority(message_priority::normal)
.from(self_hdl)
.to(dummy);
check_eq(mail_count(), 0u);
check_eq(num_timeouts(), 1u);
trigger_timeout();
expect<error>().with(make_error(sec::request_timeout)).to(self_hdl);
check_eq(*result, make_error(sec::request_timeout));
self->mail(exit_msg{nullptr, exit_reason::user_shutdown}).send(dummy);
expect<exit_msg>().to(dummy);
}
}
}

Expand All @@ -194,7 +255,10 @@ TEST("send delayed request message") {
auto result = std::make_shared<int>(0);
auto on_result = [result](int value) { *result = value; };
SECTION("strong receiver reference") {
self->mail(3).delay(1s).request(dummy, strong_ref).first.then(on_result);
self->mail(3)
.delay(1s)
.request(dummy, infinite, strong_ref)
.first.then(on_result);
launch();
check_eq(mail_count(), 0u);
check_eq(num_timeouts(), 1u);
Expand All @@ -214,7 +278,7 @@ TEST("send delayed request message") {
SECTION("weak receiver reference") {
self->mail(3)
.schedule(sys.clock().now() + 1s)
.request(dummy, weak_ref)
.request(dummy, infinite, weak_ref)
.first.then(on_result);
launch();
check_eq(mail_count(), 0u);
Expand All @@ -235,7 +299,7 @@ TEST("send delayed request message") {
SECTION("weak receiver reference that expires") {
self->mail(3)
.schedule(sys.clock().now() + 1s)
.request(dummy, weak_ref)
.request(dummy, infinite, weak_ref)
.first.then(on_result);
launch();
check_eq(mail_count(), 0u);
Expand All @@ -247,7 +311,7 @@ TEST("send delayed request message") {
SECTION("weak sender reference that expires") {
self->mail(3)
.schedule(sys.clock().now() + 1s)
.request(dummy, strong_ref, weak_self_ref)
.request(dummy, infinite, strong_ref, weak_self_ref)
.first.then(on_result);
launch();
check_eq(mail_count(), 0u);
Expand All @@ -259,6 +323,42 @@ TEST("send delayed request message") {
}
}

TEST("send delayed request message with no response") {
auto [self, launch] = sys.spawn_inactive<event_based_actor>();
auto self_hdl = actor_cast<actor>(self);
auto result = std::make_shared<error>();
auto dummy = sys.spawn([](event_based_actor* self) -> behavior {
auto res = std::make_shared<response_promise>();
return {
[self, res](int) { *res = self->make_response_promise(); },
};
});
self->mail(3)
.delay(1s)
.request(dummy, 1s)
.first //
.then([this](int) { fail("unexpected response"); },
[result](error& err) { *result = std::move(err); });
launch();
check_eq(mail_count(), 0u);
check_eq(num_timeouts(), 2u);
advance_time(1s);
check_eq(mail_count(), 1u);
check_eq(num_timeouts(), 1u);
expect<int>()
.with(3)
.priority(message_priority::normal)
.from(self_hdl)
.to(dummy);
check_eq(mail_count(), 0u);
check_eq(num_timeouts(), 1u);
advance_time(1s);
expect<error>().with(make_error(sec::request_timeout)).to(self_hdl);
check_eq(*result, make_error(sec::request_timeout));
self->mail(exit_msg{nullptr, exit_reason::user_shutdown}).send(dummy);
expect<exit_msg>().to(dummy);
}

TEST("send request message as a typed actor") {
using sender_actor = typed_actor<result<void>(int)>;
auto dummy = sys.spawn([]() -> dummy_behavior {
Expand All @@ -269,7 +369,7 @@ TEST("send request message as a typed actor") {
auto [self, launch] = sys.spawn_inactive<sender_actor::impl>();
auto self_hdl = actor_cast<actor>(self);
auto result = std::make_shared<int>(0);
self->mail(3).request(dummy).then([result](int x) { *result = x; });
self->mail(3).request(dummy, infinite).then([result](int x) { *result = x; });
launch();
expect<int>()
.with(3)
Expand Down Expand Up @@ -297,7 +397,7 @@ TEST("send request message to an invalid receiver") {
auto err = std::make_shared<error>();
auto on_error = [err](error x) { *err = x; };
SECTION("regular message") {
self->mail("hello world").request(actor{}).then(on_result, on_error);
self->mail("hello world").request(actor{}, 1s).then(on_result, on_error);
launch();
expect<error>().with(make_error(sec::invalid_request)).to(self_hdl);
check_eq(*result, 0);
Expand All @@ -306,7 +406,7 @@ TEST("send request message to an invalid receiver") {
SECTION("delayed message") {
self->mail("hello world")
.delay(1s)
.request(actor{})
.request(actor{}, 1s)
.first.then(on_result, on_error);
launch();
check_eq(mail_count(), 1u);
Expand Down
Loading

0 comments on commit d28e576

Please sign in to comment.