Permalink
Browse files

Merge pull request #777

Add non-overriding receive timouts to actor clock
  • Loading branch information...
Neverlord committed Oct 16, 2018
2 parents 6289edf + 98ad5a3 commit cc1ac1f4e7dca0b8e103423c5d11b79203de94b8
@@ -56,7 +56,11 @@ class actor_clock {
/// Schedules a `timeout_msg` for `self` at time point `t`, overriding any
/// previous receive timeout.
virtual void set_ordinary_timeout(time_point t, abstract_actor* self,
atom_value type, uint64_t id) = 0;
atom_value type, uint64_t id) = 0;
/// Schedules a `timeout_msg` for `self` at time point `t`.
virtual void set_multi_timeout(time_point t, abstract_actor* self,
atom_value type, uint64_t id) = 0;
/// Schedules a `sec::request_timeout` for `self` at time point `t`.
virtual void set_request_timeout(time_point t, abstract_actor* self,
@@ -42,6 +42,12 @@ class simple_actor_clock : public actor_clock {
uint64_t id;
};
struct multi_timeout {
strong_actor_ptr self;
atom_value type;
uint64_t id;
};
/// Request for a `sec::request_timeout` error.
struct request_timeout {
strong_actor_ptr self;
@@ -61,7 +67,7 @@ class simple_actor_clock : public actor_clock {
message content;
};
using value_type = variant<ordinary_timeout, request_timeout,
using value_type = variant<ordinary_timeout, multi_timeout, request_timeout,
actor_msg, group_msg>;
using map_type = std::multimap<time_point, value_type>;
@@ -73,6 +79,11 @@ class simple_actor_clock : public actor_clock {
bool operator()(const secondary_map::value_type& x) const noexcept;
};
struct multi_predicate {
atom_value type;
bool operator()(const secondary_map::value_type& x) const noexcept;
};
struct request_predicate {
message_id id;
bool operator()(const secondary_map::value_type& x) const noexcept;
@@ -83,6 +94,8 @@ class simple_actor_clock : public actor_clock {
void operator()(ordinary_timeout& x);
void operator()(multi_timeout& x);
void operator()(request_timeout& x);
void operator()(actor_msg& x);
@@ -91,7 +104,10 @@ class simple_actor_clock : public actor_clock {
};
void set_ordinary_timeout(time_point t, abstract_actor* self,
atom_value type, uint64_t id) override;
atom_value type, uint64_t id) override;
void set_multi_timeout(time_point t, abstract_actor* self,
atom_value type, uint64_t id) override;
void set_request_timeout(time_point t, abstract_actor* self,
message_id id) override;
@@ -31,6 +31,12 @@ operator()(const secondary_map::value_type& x) const noexcept {
return ptr != nullptr ? ptr->type == type : false;
}
bool simple_actor_clock::multi_predicate::
operator()(const secondary_map::value_type& x) const noexcept {
auto ptr = get_if<multi_timeout>(&x.second->second);
return ptr != nullptr ? ptr->type == type : false;
}
bool simple_actor_clock::request_predicate::
operator()(const secondary_map::value_type& x) const noexcept {
auto ptr = get_if<request_timeout>(&x.second->second);
@@ -45,6 +51,14 @@ void simple_actor_clock::visitor::operator()(ordinary_timeout& x) {
thisptr->drop_lookup(x.self->get(), pred);
}
void simple_actor_clock::visitor::operator()(multi_timeout& x) {
CAF_ASSERT(x.self != nullptr);
x.self->get()->eq_impl(make_message_id(), x.self, nullptr,
timeout_msg{x.type, x.id});
multi_predicate pred{x.type};
thisptr->drop_lookup(x.self->get(), pred);
}
void simple_actor_clock::visitor::operator()(request_timeout& x) {
CAF_ASSERT(x.self != nullptr);
x.self->get()->eq_impl(x.id, x.self, nullptr, sec::request_timeout);
@@ -62,7 +76,7 @@ void simple_actor_clock::visitor::operator()(group_msg& x) {
}
void simple_actor_clock::set_ordinary_timeout(time_point t, abstract_actor* self,
atom_value type, uint64_t id) {
atom_value type, uint64_t id) {
ordinary_predicate pred{type};
auto i = lookup(self, pred);
auto sptr = actor_cast<strong_actor_ptr>(self);
@@ -76,6 +90,14 @@ void simple_actor_clock::set_ordinary_timeout(time_point t, abstract_actor* self
}
}
void simple_actor_clock::set_multi_timeout(time_point t, abstract_actor* self,
atom_value type, uint64_t id) {
auto sptr = actor_cast<strong_actor_ptr>(self);
multi_timeout tmp{std::move(sptr), type, id};
auto j = schedule_.emplace(t, std::move(tmp));
actor_lookup_.emplace(self, j);
}
void simple_actor_clock::set_request_timeout(time_point t, abstract_actor* self,
message_id id) {
request_predicate pred{id};
@@ -47,6 +47,11 @@ behavior testee(stateful_actor<testee_state, raw_event_based_actor>* self,
t->set_ordinary_timeout(n, self, atom(""), self->state.timeout_id);
},
[=](add_atom) {
auto n = t->now() + seconds(10);
self->state.timeout_id += 1;
t->set_multi_timeout(n, self, atom(""), self->state.timeout_id);
},
[=](put_atom) {
auto n = t->now() + seconds(10);
self->state.timeout_id += 1;
auto mid = make_message_id(self->state.timeout_id).response_id();
@@ -124,12 +129,67 @@ CAF_TEST(override_receive_timeout) {
expect((timeout_msg), from(aut).to(aut).with(tid{43}));
}
CAF_TEST(single_request_timeout) {
// Have AUT call t.set_request_timeout().
CAF_TEST(multi_timeout) {
// Have AUT call t.set_multi_timeout().
self->send(aut, add_atom::value);
expect((add_atom), from(self).to(aut).with(_));
CAF_CHECK_EQUAL(t.schedule().size(), 1u);
CAF_CHECK_EQUAL(t.actor_lookup().size(), 1u);
// Advance time just a little bit.
t.advance_time(seconds(5));
// Have AUT call t.set_multi_timeout() again.
self->send(aut, add_atom::value);
expect((add_atom), from(self).to(aut).with(_));
CAF_CHECK_EQUAL(t.schedule().size(), 2u);
CAF_CHECK_EQUAL(t.actor_lookup().size(), 2u);
// Advance time to send timeout message.
t.advance_time(seconds(5));
CAF_CHECK_EQUAL(t.schedule().size(), 1u);
CAF_CHECK_EQUAL(t.actor_lookup().size(), 1u);
// Have AUT receive the timeout.
expect((timeout_msg), from(aut).to(aut).with(tid{42}));
// Advance time to send second timeout message.
t.advance_time(seconds(5));
CAF_CHECK_EQUAL(t.schedule().size(), 0u);
CAF_CHECK_EQUAL(t.actor_lookup().size(), 0u);
// Have AUT receive the timeout.
expect((timeout_msg), from(aut).to(aut).with(tid{43}));
}
CAF_TEST(mixed_receive_and_multi_timeouts) {
// Have AUT call t.set_receive_timeout().
self->send(aut, add_atom::value);
expect((add_atom), from(self).to(aut).with(_));
CAF_CHECK_EQUAL(t.schedule().size(), 1u);
CAF_CHECK_EQUAL(t.actor_lookup().size(), 1u);
// Advance time just a little bit.
t.advance_time(seconds(5));
// Have AUT call t.set_multi_timeout() again.
self->send(aut, ok_atom::value);
expect((ok_atom), from(self).to(aut).with(_));
CAF_CHECK_EQUAL(t.schedule().size(), 2u);
CAF_CHECK_EQUAL(t.actor_lookup().size(), 2u);
// Advance time to send timeout message.
t.advance_time(seconds(5));
CAF_CHECK_EQUAL(t.schedule().size(), 1u);
CAF_CHECK_EQUAL(t.actor_lookup().size(), 1u);
// Have AUT receive the timeout.
expect((timeout_msg), from(aut).to(aut).with(tid{42}));
// Advance time to send second timeout message.
t.advance_time(seconds(5));
CAF_CHECK_EQUAL(t.schedule().size(), 0u);
CAF_CHECK_EQUAL(t.actor_lookup().size(), 0u);
// Have AUT receive the timeout.
expect((timeout_msg), from(aut).to(aut).with(tid{43}));
}
CAF_TEST(single_request_timeout) {
// Have AUT call t.set_request_timeout().
self->send(aut, put_atom::value);
expect((put_atom), from(self).to(aut).with(_));
CAF_CHECK_EQUAL(t.schedule().size(), 1u);
CAF_CHECK_EQUAL(t.actor_lookup().size(), 1u);
// Advance time to send timeout message.
t.advance_time(seconds(10));
CAF_CHECK_EQUAL(t.schedule().size(), 0u);
@@ -147,8 +207,8 @@ CAF_TEST(mixed_receive_and_request_timeouts) {
// Cause the request timeout to arrive later.
t.advance_time(seconds(5));
// Have AUT call t.set_request_timeout().
self->send(aut, add_atom::value);
expect((add_atom), from(self).to(aut).with(_));
self->send(aut, put_atom::value);
expect((put_atom), from(self).to(aut).with(_));
CAF_CHECK_EQUAL(t.schedule().size(), 2u);
CAF_CHECK_EQUAL(t.actor_lookup().size(), 2u);
// Advance time to send receive timeout message.

0 comments on commit cc1ac1f

Please sign in to comment.