Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added `timed_sync_send`

this new function allows users to catch timeouts in the
`on_sync_failure` handler instead of using `after(...)` statements;
the `timed_` functions signal an occured timeout by putting a
'TIMEOUT' message into the actor's mailbox
  • Loading branch information...
commit 4e6b57af2003fe6e0126007dfef5b4ef15870917 1 parent 6d12f13
@Neverlord Neverlord authored
View
46 cppa/cppa.hpp
@@ -532,6 +532,52 @@ inline message_future sync_send(const actor_ptr& whom, Args&&... what) {
}
/**
+ * @brief Sends @p what as a synchronous message to @p whom with a timeout.
+ *
+ * The calling actor receives a 'TIMEOUT' message as response after
+ * given timeout exceeded and no response messages was received.
+ * @param whom Receiver of the message.
+ * @param what Message content as tuple.
+ * @returns A handle identifying a future to the response of @p whom.
+ * @warning The returned handle is actor specific and the response to the sent
+ * message cannot be received by another actor.
+ * @throws std::invalid_argument if <tt>whom == nullptr</tt>
+ */
+template<class Rep, class Period, typename... Args>
+message_future timed_sync_send_tuple(const actor_ptr& whom,
+ const std::chrono::duration<Rep,Period>& rel_time,
+ any_tuple what) {
+ if (whom) return self->send_timed_sync_message(whom.get(),
+ rel_time,
+ std::move(what));
+ else throw std::invalid_argument("whom == nullptr");
+}
+
+/**
+ * @brief Sends <tt>{what...}</tt> as a synchronous message to @p whom
+ * with a timeout.
+ *
+ * The calling actor receives a 'TIMEOUT' message as response after
+ * given timeout exceeded and no response messages was received.
+ * @param whom Receiver of the message.
+ * @param what Message elements.
+ * @returns A handle identifying a future to the response of @p whom.
+ * @warning The returned handle is actor specific and the response to the sent
+ * message cannot be received by another actor.
+ * @pre <tt>sizeof...(Args) > 0</tt>
+ * @throws std::invalid_argument if <tt>whom == nullptr</tt>
+ */
+template<class Rep, class Period, typename... Args>
+message_future timed_sync_send(const actor_ptr& whom,
+ const std::chrono::duration<Rep,Period>& rel_time,
+ Args&&... what) {
+ static_assert(sizeof...(Args) > 0, "no message to send");
+ return timed_sync_send_tuple(whom,
+ rel_time,
+ make_any_tuple(std::forward<Args>(what)...));
+}
+
+/**
* @brief Handles a synchronous response message in an event-based way.
* @param handle A future for a synchronous response.
* @throws std::invalid_argument if given behavior does not has a valid
View
2  cppa/detail/receive_policy.hpp
@@ -243,7 +243,7 @@ class receive_policy {
return normal_exit_signal;
}
}
- else if (v0 == atom("TIMEOUT")) {
+ else if (v0 == atom("SYNC_TOUT")) {
CPPA_REQUIRE(!message_id.valid());
return client->waits_for_timeout(v1) ? timeout_message
: expired_timeout_message;
View
6 cppa/local_actor.hpp
@@ -45,6 +45,8 @@
#include "cppa/partial_function.hpp"
#include "cppa/memory_cached_mixin.hpp"
+#include "cppa/util/duration.hpp"
+
#include "cppa/detail/recursive_queue_node.hpp"
namespace cppa {
@@ -377,6 +379,10 @@ class local_actor : public memory_cached_mixin<actor> {
return awaited_response;
}
+ message_id_t send_timed_sync_message(actor* whom,
+ const util::duration& rel_time,
+ any_tuple&& what);
+
// returns 0 if last_dequeued() is an asynchronous or sync request message,
// a response id generated from the request id otherwise
inline message_id_t get_response_id() {
View
4 src/abstract_scheduled_actor.cpp
@@ -37,7 +37,7 @@ void abstract_scheduled_actor::request_timeout(const util::duration& d) {
if (d.is_zero()) {
// immediately enqueue timeout
auto node = super::fetch_node(this,
- make_any_tuple(atom("TIMEOUT"),
+ make_any_tuple(atom("SYNC_TOUT"),
++m_active_timeout_id));
this->m_mailbox.enqueue(node);
}
@@ -45,7 +45,7 @@ void abstract_scheduled_actor::request_timeout(const util::duration& d) {
get_scheduler()->delayed_send(
this, d,
make_any_tuple(
- atom("TIMEOUT"), ++m_active_timeout_id));
+ atom("SYNC_TOUT"), ++m_active_timeout_id));
}
m_has_pending_timeout_request = true;
}
View
10 src/local_actor.cpp
@@ -30,6 +30,7 @@
#include "cppa/cppa.hpp"
#include "cppa/atom.hpp"
+#include "cppa/scheduler.hpp"
#include "cppa/local_actor.hpp"
namespace cppa {
@@ -157,4 +158,13 @@ response_handle local_actor::make_response_handle() {
return std::move(result);
}
+message_id_t local_actor::send_timed_sync_message(actor* whom,
+ const util::duration& rel_time,
+ any_tuple&& what) {
+ auto mid = this->send_sync_message(whom, std::move(what));
+ auto tmp = make_any_tuple(atom("TIMEOUT"));
+ get_scheduler()->delayed_reply(this, rel_time, mid, std::move(tmp));
+ return mid;
+}
+
} // namespace cppa
View
18 unit_testing/test__remote_actor.cpp
@@ -331,11 +331,18 @@ int main(int argc, char** argv) {
cout << "test group communication via network (inverted setup)" << endl;
spawn5_server(remote_client, true);
+ self->on_sync_failure([&] {
+ CPPA_ERROR("unexpected message: "
+ << to_string(self->last_dequeued())
+ << endl);
+ });
+
// test forward_to "over network and back"
cout << "test forwarding over network 'and back'" << endl;
auto ra = spawn<replier>();
- sync_send(remote_client, atom("fwd"), ra, "hello replier!").await(
- on(42) >> [&] {
+ timed_sync_send(remote_client, chrono::seconds(5), atom("fwd"), ra, "hello replier!").await(
+ on_arg_match >> [&](int forty_two) {
+ CPPA_CHECK_EQUAL(42, forty_two);
auto from = self->last_sender();
if (!from) {
CPPA_ERROR("from == nullptr");
@@ -347,12 +354,7 @@ int main(int argc, char** argv) {
}
}
},
- others() >> [&] {
- CPPA_ERROR("unexpected: " << to_string(self->last_dequeued()));
- },
- after(chrono::seconds(5)) >> [&] {
- CPPA_ERROR("fowarding failed; no message received within 5s");
- }
+ others() >> [] { self->handle_sync_failure(); }
);
cout << "wait for a last goodbye" << endl;
View
17 unit_testing/test__sync_send.cpp
@@ -1,3 +1,5 @@
+#define CPPA_VERBOSE_CHECK
+
#include "test.hpp"
#include "cppa/cppa.hpp"
@@ -129,6 +131,21 @@ int main() {
send(spawn_monitor<A>(self), atom("go"), spawn<D>(spawn<C>()));
await_success_message();
await_all_others_done();
+ cout << __LINE__ << endl;
+ //sync_send(self, atom("NoWay")).await(
+ timed_sync_send(self, std::chrono::milliseconds(50), atom("NoWay")).await(
+ on(atom("TIMEOUT")) >> [&] {
+ cout << __LINE__ << endl;
+ // must timeout
+ cout << "received timeout (ok)" << endl;
+ },
+ others() >> [&] {
+ cout << __LINE__ << endl;
+ CPPA_ERROR("unexpected message: "
+ << to_string(self->last_dequeued()));
+ }
+ );
+ cout << __LINE__ << endl;
shutdown();
return CPPA_TEST_RESULT;
}
Please sign in to comment.
Something went wrong with that request. Please try again.