Browse files

priority-aware actors

this patch adds a new spawn option for priority-aware actors
and refactors libcppa's message passing; this version has a known
issue with network distributed group messaging
  • Loading branch information...
1 parent 1b1d0d3 commit 2c3cd2034e31cbffb8a35974a522957ac66313e8 @Neverlord Neverlord committed Apr 29, 2013
Showing with 1,349 additions and 985 deletions.
  1. +1 −1 CMakeLists.txt
  2. +1 −0 cppa.files
  3. +3 −8 cppa/actor.hpp
  4. +9 −0 cppa/channel.hpp
  5. +3 −3 cppa/config.hpp
  6. +4 −0 cppa/context_switching_actor.hpp
  7. +29 −261 cppa/cppa.hpp
  8. +4 −1 cppa/detail/behavior_impl.hpp
  9. +2 −2 cppa/detail/decorated_tuple.hpp
  10. +1 −1 cppa/detail/event_based_actor_factory.hpp
  11. +9 −8 cppa/detail/receive_policy.hpp
  12. +1 −1 cppa/detail/sync_request_bouncer.hpp
  13. +1 −1 cppa/detail/tdata.hpp
  14. +6 −6 cppa/extend.hpp
  15. +9 −1 cppa/intrusive_ptr.hpp
  16. +20 −39 cppa/local_actor.hpp
  17. +185 −92 cppa/logging.hpp
  18. +1 −4 cppa/mailbox_based.hpp
  19. +17 −0 cppa/memory_cached.hpp
  20. +1 −1 cppa/message_future.hpp
  21. +60 −17 cppa/message_header.hpp
  22. +5 −5 cppa/network/middleman_event_handler_base.hpp
  23. +6 −6 cppa/opencl/actor_facade.hpp
  24. +53 −0 cppa/prioritizing.hpp
  25. +12 −7 cppa/process_information.hpp
  26. +3 −2 cppa/sb_actor.hpp
  27. +0 −3 cppa/scheduled_actor.hpp
  28. +4 −2 cppa/scheduler.hpp
  29. +285 −0 cppa/send.hpp
  30. +21 −6 cppa/spawn_options.hpp
  31. +1 −2 cppa/stacked.hpp
  32. +3 −15 cppa/thread_mapped_actor.hpp
  33. +52 −28 cppa/threaded.hpp
  34. +7 −0 cppa/to_string.hpp
  35. +20 −16 src/actor.cpp
  36. +14 −7 src/actor_registry.cpp
  37. +3 −2 src/binary_deserializer.cpp
  38. +6 −0 src/channel.cpp
  39. +5 −3 src/context_switching_actor.cpp
  40. +13 −22 src/default_actor_addressing.cpp
  41. +10 −12 src/default_actor_proxy.cpp
  42. +25 −23 src/default_peer.cpp
  43. +4 −4 src/default_protocol.cpp
  44. +8 −8 src/event_based_actor.cpp
  45. +44 −17 src/group_manager.cpp
  46. +6 −4 src/local_actor.cpp
  47. +16 −6 src/logging.cpp
  48. +17 −9 src/message_header.cpp
  49. +15 −15 src/middleman.cpp
  50. +14 −14 src/opencl/command_dispatcher.cpp
  51. +4 −4 src/opencl/program.cpp
  52. +9 −0 src/process_information.cpp
  53. +1 −1 src/scheduled_actor.cpp
  54. +3 −3 src/scheduler.cpp
  55. +1 −1 src/self.cpp
  56. +3 −31 src/thread_mapped_actor.cpp
  57. +20 −11 src/thread_pool_scheduler.cpp
  58. +45 −90 src/uniform_type_info.cpp
  59. +7 −8 unit_testing/ping_pong.cpp
  60. +8 −7 unit_testing/test.hpp
  61. +8 −14 unit_testing/test_match.cpp
  62. +1 −1 unit_testing/test_opencl.cpp
  63. +114 −89 unit_testing/test_remote_actor.cpp
  64. +7 −7 unit_testing/test_serialization.cpp
  65. +63 −12 unit_testing/test_spawn.cpp
  66. +14 −19 unit_testing/test_sync_send.cpp
  67. +2 −2 unit_testing/test_tuple.cpp
View
2 CMakeLists.txt
@@ -72,7 +72,7 @@ endif ()
# set build type (evaluate ENABLE_DEBUG flag)
if (ENABLE_DEBUG)
set(CMAKE_BUILD_TYPE Debug)
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DCPPA_DEBUG")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DCPPA_ENABLE_DEBUG")
endif (ENABLE_DEBUG)
if (CPPA_LOG_LEVEL)
View
1 cppa.files
@@ -307,3 +307,4 @@ unit_testing/test_tuple.cpp
unit_testing/test_uniform_type.cpp
unit_testing/test_yield_interface.cpp
cppa/prioritizing.hpp
+cppa/send.hpp
View
11 cppa/actor.hpp
@@ -74,12 +74,7 @@ class actor : public channel {
public:
- /**
- * @brief Enqueues @p msg to the actor's mailbox and returns true if
- * this actor is an scheduled actor that successfully changed
- * its state to @p pending in response to the enqueue operation.
- */
- virtual bool chained_enqueue(const message_header& hdr, any_tuple msg);
+ ~actor();
/**
* @brief Attaches @p ptr to this actor.
@@ -192,14 +187,14 @@ class actor : public channel {
*/
inline bool exited() const;
- private:
-
// cannot be changed after construction
const actor_id m_id;
// you're either a proxy or you're not
const bool m_is_proxy;
+ private:
+
// initially exit_reason::not_exited
std::atomic<std::uint32_t> m_exit_reason;
View
9 cppa/channel.hpp
@@ -64,6 +64,15 @@ class channel : public ref_counted {
*/
virtual void enqueue(const message_header& hdr, any_tuple msg) = 0;
+
+ /**
+ * @brief Enqueues @p msg to the list of received messages and returns
+ * true if this is an scheduled actor that successfully changed
+ * its state to @p pending in response to the enqueue operation.
+ */
+ virtual bool chained_enqueue(const message_header& hdr, any_tuple msg);
+
+
protected:
virtual ~channel();
View
6 cppa/config.hpp
@@ -55,7 +55,7 @@
#include <cstdio>
#include <cstdlib>
-#ifdef CPPA_DEBUG
+#ifdef CPPA_ENABLE_DEBUG
#include <execinfo.h>
#define CPPA_REQUIRE__(stmt, file, line) \
@@ -70,9 +70,9 @@
if ((stmt) == false) { \
CPPA_REQUIRE__(#stmt, __FILE__, __LINE__); \
}((void) 0)
-#else // CPPA_DEBUG
+#else // CPPA_ENABLE_DEBUG
#define CPPA_REQUIRE(unused) ((void) 0)
-#endif // CPPA_DEBUG
+#endif // CPPA_ENABLE_DEBUG
#define CPPA_CRITICAL__(error, file, line) { \
printf("%s:%u: critical error: '%s'\n", file, line, error); \
View
4 cppa/context_switching_actor.hpp
@@ -77,6 +77,10 @@ class context_switching_actor : public extend<scheduled_actor,context_switching_
mailbox_element* await_message(const timeout_type& abs_time);
+ inline mailbox_element* try_pop() {
+ return m_mailbox.try_pop();
+ }
+
private:
// required by util::fiber
View
290 cppa/cppa.hpp
@@ -39,6 +39,7 @@
#include "cppa/on.hpp"
#include "cppa/atom.hpp"
+#include "cppa/send.hpp"
#include "cppa/self.hpp"
#include "cppa/actor.hpp"
#include "cppa/match.hpp"
@@ -57,6 +58,7 @@
#include "cppa/singletons.hpp"
#include "cppa/exit_reason.hpp"
#include "cppa/local_actor.hpp"
+#include "cppa/prioritizing.hpp"
#include "cppa/spawn_options.hpp"
#include "cppa/message_future.hpp"
#include "cppa/response_handle.hpp"
@@ -424,78 +426,12 @@
namespace cppa {
-namespace detail {
-
-template<typename T>
-inline void send_impl(T* ptr, any_tuple&& arg) {
- if (ptr) self->send_message(ptr, std::move(arg));
-}
-
-template<typename T, typename... Ts>
-inline void send_tpl_impl(T* ptr, Ts&&... args) {
- static_assert(sizeof...(Ts) > 0, "no message to send");
- if (ptr) self->send_message(ptr, make_any_tuple(std::forward<Ts>(args)...));
-}
-
-} // namespace detail
-
/**
* @ingroup MessageHandling
* @{
*/
/**
- * @brief Sends @p what as a message to @p whom.
- * @param whom Receiver of the message.
- * @param what Message content as tuple.
- */
-template<class C, typename... Ts>
-inline typename enable_if_channel<C>::type
-send_tuple(const intrusive_ptr<C>& whom, any_tuple what) {
- detail::send_impl(whom.get(), std::move(what));
-}
-
-/**
- * @brief Sends <tt>{what...}</tt> as a message to @p whom.
- * @param whom Receiver of the message.
- * @param what Message elements.
- * @pre <tt>sizeof...(Ts) > 0</tt>
- */
-template<class C, typename... Ts>
-inline typename enable_if_channel<C>::type
-send(const intrusive_ptr<C>& whom, Ts&&... args) {
- detail::send_tpl_impl(whom.get(), std::forward<Ts>(args)...);
-}
-
-/**
- * @brief Sends @p what as a message to @p whom, but sets
- * the sender information to @p from.
- * @param from Sender as seen by @p whom.
- * @param whom Receiver of the message.
- * @param what Message elements.
- * @pre <tt>sizeof...(Ts) > 0</tt>
- */
-template<class C, typename... Ts>
-inline typename enable_if_channel<C>::type
-send_tuple_as(const actor_ptr& from, const intrusive_ptr<C>& whom, any_tuple what) {
- if (whom) whom->enqueue(from.get(), std::move(what));
-}
-
-/**
- * @brief Sends <tt>{what...}</tt> as a message to @p whom, but sets
- * the sender information to @p from.
- * @param from Sender as seen by @p whom.
- * @param whom Receiver of the message.
- * @param what Message elements.
- * @pre <tt>sizeof...(Ts) > 0</tt>
- */
-template<class C, typename... Ts>
-inline typename enable_if_channel<C>::type
-send_as(const actor_ptr& from, const intrusive_ptr<C>& whom, Ts&&... what) {
- send_tuple_as(from, whom, make_any_tuple(std::forward<Ts>(what)...));
-}
-
-/**
* @brief Sends a message to @p whom.
*
* <b>Usage example:</b>
@@ -513,191 +449,16 @@ operator<<(const intrusive_ptr<C>& whom, any_tuple what) {
return whom;
}
-/**
- * @brief Sends @p what as a synchronous message to @p whom.
- * @param whom Receiver of the message.
- * @param what Message content as tuple.
- * @returns A handle identifying a future to the response of @p whom.
- * @warning The returned handle is actor specific and the response to the sent
- * message cannot be received by another actor.
- * @throws std::invalid_argument if <tt>whom == nullptr</tt>
- */
-inline message_future sync_send_tuple(const actor_ptr& whom, any_tuple what) {
- if (whom) return self->send_sync_message(whom.get(), std::move(what));
- else throw std::invalid_argument("whom == nullptr");
-}
-
-/**
- * @brief Sends <tt>{what...}</tt> as a synchronous message to @p whom.
- * @param whom Receiver of the message.
- * @param what Message elements.
- * @returns A handle identifying a future to the response of @p whom.
- * @warning The returned handle is actor specific and the response to the sent
- * message cannot be received by another actor.
- * @pre <tt>sizeof...(Ts) > 0</tt>
- * @throws std::invalid_argument if <tt>whom == nullptr</tt>
- */
-template<typename... Ts>
-inline message_future sync_send(const actor_ptr& whom, Ts&&... what) {
- static_assert(sizeof...(Ts) > 0, "no message to send");
- return sync_send_tuple(whom, make_any_tuple(std::forward<Ts>(what)...));
-}
-
-/**
- * @brief Sends @p what as a synchronous message to @p whom with a timeout.
- *
- * The calling actor receives a 'TIMEOUT' message as response after
- * given timeout exceeded and no response messages was received.
- * @param whom Receiver of the message.
- * @param what Message content as tuple.
- * @returns A handle identifying a future to the response of @p whom.
- * @warning The returned handle is actor specific and the response to the sent
- * message cannot be received by another actor.
- * @throws std::invalid_argument if <tt>whom == nullptr</tt>
- */
-template<class Rep, class Period, typename... Ts>
-message_future timed_sync_send_tuple(const actor_ptr& whom,
- const std::chrono::duration<Rep,Period>& rel_time,
- any_tuple what) {
- if (whom) return self->send_timed_sync_message(whom.get(),
- rel_time,
- std::move(what));
- else throw std::invalid_argument("whom == nullptr");
-}
-
-/**
- * @brief Sends <tt>{what...}</tt> as a synchronous message to @p whom
- * with a timeout.
- *
- * The calling actor receives a 'TIMEOUT' message as response after
- * given timeout exceeded and no response messages was received.
- * @param whom Receiver of the message.
- * @param what Message elements.
- * @returns A handle identifying a future to the response of @p whom.
- * @warning The returned handle is actor specific and the response to the sent
- * message cannot be received by another actor.
- * @pre <tt>sizeof...(Ts) > 0</tt>
- * @throws std::invalid_argument if <tt>whom == nullptr</tt>
- */
-template<class Rep, class Period, typename... Ts>
-message_future timed_sync_send(const actor_ptr& whom,
- const std::chrono::duration<Rep,Period>& rel_time,
- Ts&&... what) {
- static_assert(sizeof...(Ts) > 0, "no message to send");
- return timed_sync_send_tuple(whom,
- rel_time,
- make_any_tuple(std::forward<Ts>(what)...));
-}
-
-/**
- * @brief Sends a message to the sender of the last received message.
- * @param what Message content as a tuple.
- */
-inline void reply_tuple(any_tuple what) {
- self->reply_message(std::move(what));
-}
-
-/**
- * @brief Sends a message to the sender of the last received message.
- * @param what Message elements.
- */
-template<typename... Ts>
-inline void reply(Ts&&... what) {
- self->reply_message(make_any_tuple(std::forward<Ts>(what)...));
-}
-
-/**
- * @brief Sends a message as reply to @p handle.
- */
-template<typename... Ts>
-inline void reply_to(const response_handle& handle, Ts&&... what) {
- if (handle.valid()) {
- handle.apply(make_any_tuple(std::forward<Ts>(what)...));
- }
-}
-
-/**
- * @brief Replies with @p what to @p handle.
- * @param handle Identifies a previously received request.
- * @param what Response message.
- */
-inline void reply_tuple_to(const response_handle& handle, any_tuple what) {
- handle.apply(std::move(what));
-}
-
-/**
- * @brief Forwards the last received message to @p whom.
- */
-inline void forward_to(const actor_ptr& whom) {
- self->forward_message(whom);
-}
-
-/**
- * @brief Sends a message to @p whom that is delayed by @p rel_time.
- * @param whom Receiver of the message.
- * @param rtime Relative time duration to delay the message in
- * microseconds, milliseconds, seconds or minutes.
- * @param what Message content as a tuple.
- */
-template<class Rep, class Period, typename... Ts>
-inline void delayed_send_tuple(const channel_ptr& whom,
- const std::chrono::duration<Rep,Period>& rtime,
- any_tuple what) {
- if (whom) get_scheduler()->delayed_send(whom, rtime, what);
-}
-
-/**
- * @brief Sends a message to @p whom that is delayed by @p rel_time.
- * @param whom Receiver of the message.
- * @param rtime Relative time duration to delay the message in
- * microseconds, milliseconds, seconds or minutes.
- * @param what Message elements.
- */
-template<class Rep, class Period, typename... Ts>
-inline void delayed_send(const channel_ptr& whom,
- const std::chrono::duration<Rep,Period>& rtime,
- Ts&&... what) {
- static_assert(sizeof...(Ts) > 0, "no message to send");
- if (whom) {
- delayed_send_tuple(whom,
- rtime,
- make_any_tuple(std::forward<Ts>(what)...));
- }
-}
-
-/**
- * @brief Sends a reply message that is delayed by @p rel_time.
- * @param rtime Relative time duration to delay the message in
- * microseconds, milliseconds, seconds or minutes.
- * @param what Message content as a tuple.
- * @see delayed_send()
- */
-template<class Rep, class Period, typename... Ts>
-inline void delayed_reply_tuple(const std::chrono::duration<Rep, Period>& rtime,
- any_tuple what) {
- get_scheduler()->delayed_reply(self->last_sender(),
- rtime,
- self->get_response_id(),
- std::move(what));
-}
-
-/**
- * @brief Sends a reply message that is delayed by @p rel_time.
- * @param rtime Relative time duration to delay the message in
- * microseconds, milliseconds, seconds or minutes.
- * @param what Message elements.
- * @see delayed_send()
- */
-template<class Rep, class Period, typename... Ts>
-inline void delayed_reply(const std::chrono::duration<Rep, Period>& rtime,
- Ts&&... what) {
- delayed_reply_tuple(rtime, make_any_tuple(std::forward<Ts>(what)...));
+inline const self_type& operator<<(const self_type& s, any_tuple what) {
+ send_tuple(s, std::move(what));
+ return s;
}
/**
* @}
*/
+/*
// matches "send(this, ...)" and "send(self, ...)"
inline void send_tuple(channel* whom, any_tuple what) {
detail::send_impl(whom, std::move(what));
@@ -706,10 +467,7 @@ template<typename... Ts>
inline void send(channel* whom, Ts&&... args) {
detail::send_tpl_impl(whom, std::forward<Ts>(args)...);
}
-inline const self_type& operator<<(const self_type& s, any_tuple what) {
- detail::send_impl(static_cast<channel*>(s.get()), std::move(what));
- return s;
-}
+*/
inline actor_ptr eval_sopts(spawn_options opts, actor_ptr ptr) {
if (has_monitor_flag(opts)) self->monitor(ptr);
if (has_link_flag(opts)) self->link_to(ptr);
@@ -747,13 +505,17 @@ template<class Impl, spawn_options Options = no_spawn_options, typename... Ts>
actor_ptr spawn(Ts&&... args) {
static_assert(std::is_base_of<event_based_actor,Impl>::value,
"Impl is not a derived type of event_based_actor");
- scheduled_actor* rawptr;
- if (has_detach_flag(Options)) {
- typedef typename extend<Impl>::template with<threaded> derived;
- rawptr = detail::memory::create<derived>(std::forward<Ts>(args)...);
+ scheduled_actor_ptr ptr;
+ if (has_priority_aware_flag(Options)) {
+ using derived = typename extend<Impl>::template with<threaded,prioritizing>;
+ ptr = make_counted<derived>(std::forward<Ts>(args)...);
}
- else rawptr = detail::memory::create<Impl>(std::forward<Ts>(args)...);
- return eval_sopts(Options, get_scheduler()->exec(Options, rawptr));
+ else if (has_detach_flag(Options)) {
+ using derived = typename extend<Impl>::template with<threaded>;
+ ptr = make_counted<derived>(std::forward<Ts>(args)...);
+ }
+ else ptr = make_counted<Impl>(std::forward<Ts>(args)...);
+ return eval_sopts(Options, get_scheduler()->exec(Options, std::move(ptr)));
}
/**
@@ -865,25 +627,31 @@ void shutdown(); // note: implemented in singleton_manager.cpp
* <tt>send(whom, atom("EXIT"), reason)</tt>.
* @pre <tt>reason != exit_reason::normal</tt>
*/
-inline void send_exit(const actor_ptr& whom, std::uint32_t reason) {
+inline void send_exit(actor_ptr whom, std::uint32_t reason) {
CPPA_REQUIRE(reason != exit_reason::normal);
- send(whom, atom("EXIT"), reason);
+ send(std::move(whom), atom("EXIT"), reason);
}
/**
* @brief Sets the actor's behavior and discards the previous behavior
* unless {@link keep_behavior} is given as first argument.
*/
-template<typename... Ts>
-inline void become(Ts&&... args) {
- self->become(std::forward<Ts>(args)...);
+template<typename T, typename... Ts>
+inline void become(T arg, Ts&&... args) {
+ self->do_become(match_expr_convert(arg, std::forward<Ts>(args)...), true);
+ //become(std::forward<Ts>(args)...);
+}
+
+template<bool Discard, typename... Ts>
+inline void become(behavior_policy<Discard>, Ts&&... args) {
+ self->do_become(match_expr_convert(std::forward<Ts>(args)...), Discard);
}
/**
* @brief Returns to a previous behavior if available.
*/
inline void unbecome() {
- self->unbecome();
+ self->do_unbecome();
}
struct actor_ostream {
View
5 cppa/detail/behavior_impl.hpp
@@ -151,13 +151,16 @@ default_behavior_impl<MatchExpr, F>* new_default_behavior_impl(const MatchExpr&
template<typename F>
class continuation_decorator : public behavior_impl {
+ typedef behavior_impl super;
+
public:
typedef typename behavior_impl::pointer pointer;
template<typename Fun>
continuation_decorator(Fun&& fun, pointer decorated)
- : m_fun(std::forward<Fun>(fun)), m_decorated(std::move(decorated)) {
+ : super(decorated->timeout()), m_fun(std::forward<Fun>(fun))
+ , m_decorated(std::move(decorated)) {
CPPA_REQUIRE(m_decorated != nullptr);
}
View
4 cppa/detail/decorated_tuple.hpp
@@ -107,7 +107,7 @@ class decorated_tuple : public abstract_tuple {
decorated_tuple(cow_pointer_type d, const vector_type& v)
: super(false)
, m_decorated(std::move(d)), m_mapping(v) {
-# ifdef CPPA_DEBUG
+# ifdef CPPA_ENABLE_DEBUG
const cow_pointer_type& ptr = m_decorated; // prevent detaching
# endif
CPPA_REQUIRE(ptr->size() >= sizeof...(Ts));
@@ -117,7 +117,7 @@ class decorated_tuple : public abstract_tuple {
decorated_tuple(cow_pointer_type d, size_t offset)
: super(false), m_decorated(std::move(d)) {
-# ifdef CPPA_DEBUG
+# ifdef CPPA_ENABLE_DEBUG
const cow_pointer_type& ptr = m_decorated; // prevent detaching
# endif
CPPA_REQUIRE((ptr->size() - offset) >= sizeof...(Ts));
View
2 cppa/detail/event_based_actor_factory.hpp
@@ -104,7 +104,7 @@ class event_based_actor_factory {
template<typename... Ts>
actor_ptr spawn(Ts&&... args) {
- auto ptr = memory::create<impl>(m_init, m_on_exit, std::forward<Ts>(args)...);
+ auto ptr = make_counted<impl>(m_init, m_on_exit, std::forward<Ts>(args)...);
return get_scheduler()->exec(no_spawn_options, ptr);
}
View
17 cppa/detail/receive_policy.hpp
@@ -153,7 +153,7 @@ class receive_policy {
else if (!invoke_from_cache(client, bhvr)) {
if (bhvr.timeout().is_zero()) {
pointer e = nullptr;
- while ((e = client->m_mailbox.try_pop()) != nullptr) {
+ while ((e = client->try_pop()) != nullptr) {
CPPA_REQUIRE(e->marked == false);
if (invoke(client, e, bhvr)) {
return; // done
@@ -356,15 +356,15 @@ class receive_policy {
CPPA_CRITICAL("illegal filter result");
}
case normal_exit_signal: {
- CPPA_LOG_DEBUG("dropped message: normal exit signal");
+ CPPA_LOGMF(CPPA_DEBUG, client, "dropped normal exit signal");
return hm_drop_msg;
}
case expired_sync_response: {
- CPPA_LOG_DEBUG("dropped message: expired sync response");
+ CPPA_LOGMF(CPPA_DEBUG, client, "dropped expired sync response");
return hm_drop_msg;
}
case expired_timeout_message: {
- CPPA_LOG_DEBUG("dropped message: expired timeout message");
+ CPPA_LOGMF(CPPA_DEBUG, client, "dropped expired timeout message");
return hm_drop_msg;
}
case non_normal_exit_signal: {
@@ -388,7 +388,7 @@ class receive_policy {
if (awaited_response.valid() && node->mid == awaited_response) {
auto previous_node = hm_begin(client, node, policy);
if (!fun(node->msg) && handle_sync_failure_on_mismatch) {
- CPPA_LOG_WARNING("sync failure occured");
+ CPPA_LOGMF(CPPA_WARNING, client, "sync failure occured");
client->handle_sync_failure();
}
client->mark_arrived(awaited_response);
@@ -407,9 +407,10 @@ class receive_policy {
auto id = node->mid;
auto sender = node->sender;
if (id.valid() && !id.is_answered() && sender) {
- CPPA_LOG_WARNING("actor did not reply to a "
- "synchronous request message");
- sender->enqueue({client, id.response_id()},
+ CPPA_LOGMF(CPPA_WARNING, client,
+ "actor did not reply to a "
+ "synchronous request message");
+ sender->enqueue({client, sender, id.response_id()},
make_any_tuple(atom("VOID")));
}
hm_cleanup(client, previous_node, policy);
View
2 cppa/detail/sync_request_bouncer.hpp
@@ -49,7 +49,7 @@ struct sync_request_bouncer {
CPPA_REQUIRE(rsn != exit_reason::not_exited);
if (mid.is_request() && sender != nullptr) {
actor_ptr nobody;
- sender->enqueue({nobody, mid.response_id()},
+ sender->enqueue({nobody, sender, mid.response_id()},
make_any_tuple(atom("EXITED"), rsn));
}
}
View
2 cppa/detail/tdata.hpp
@@ -329,7 +329,7 @@ struct tdata<Head, Tail...> : tdata<Tail...> {
}
inline void* mutable_at(size_t p) {
-# ifdef CPPA_DEBUG
+# ifdef CPPA_ENABLE_DEBUG
if (p == 0) {
if (std::is_same<decltype(ptr_to(head)), const void*>::value) {
throw std::logic_error{"mutable_at with const head"};
View
12 cppa/extend.hpp
@@ -38,14 +38,14 @@ namespace cppa {
namespace detail {
-template<class B, class D, CPPA_MIXIN... Ms>
+template<class D, class B, CPPA_MIXIN... Ms>
struct extend_helper;
-template<class B, class D>
-struct extend_helper<B,D> { typedef D type; };
+template<class D, class B>
+struct extend_helper<D,B> { typedef B type; };
-template<class B, class D, CPPA_MIXIN M, CPPA_MIXIN... Ms>
-struct extend_helper<B,D,M,Ms...> : extend_helper<B,M<D,B>,Ms...> { };
+template<class D, class B, CPPA_MIXIN M, CPPA_MIXIN... Ms>
+struct extend_helper<D,B,M,Ms...> : extend_helper<D,M<B,D>,Ms...> { };
} // namespace detail
@@ -57,7 +57,7 @@ struct extend_helper<B,D,M,Ms...> : extend_helper<B,M<D,B>,Ms...> { };
* Mixins in libcppa always have two template parameters: base type and
* derived type. This allows mixins to make use of the curiously recurring
* template pattern (CRTP). However, if none of the used mixins use CRTP,
- * the second template argument can ignored (it is then set to Base).
+ * the second template argument can be ignored (it is then set to Base).
*/
template<class Base, class Derived = Base>
struct extend {
View
10 cppa/intrusive_ptr.hpp
@@ -36,6 +36,7 @@
#include <stdexcept>
#include <type_traits>
+#include "cppa/memory_cached.hpp"
#include "cppa/util/comparable.hpp"
namespace cppa {
@@ -208,7 +209,14 @@ inline bool operator!=(const intrusive_ptr<X>& lhs, const intrusive_ptr<Y>& rhs)
* of {@link ref_counted} and wraps it in an {@link intrusive_ptr}.
*/
template<typename T, typename... Ts>
-intrusive_ptr<T> make_counted(Ts&&... args) {
+typename std::enable_if<is_memory_cached<T>::value,intrusive_ptr<T>>::type
+make_counted(Ts&&... args) {
+ return {detail::memory::create<T>(std::forward<Ts>(args)...)};
+}
+
+template<typename T, typename... Ts>
+typename std::enable_if<not is_memory_cached<T>::value,intrusive_ptr<T>>::type
+make_counted(Ts&&... args) {
return {new T(std::forward<Ts>(args)...)};
}
View
59 cppa/local_actor.hpp
@@ -46,6 +46,7 @@
#include "cppa/message_header.hpp"
#include "cppa/mailbox_element.hpp"
#include "cppa/response_handle.hpp"
+#include "cppa/message_priority.hpp"
#include "cppa/partial_function.hpp"
#include "cppa/util/duration.hpp"
@@ -55,6 +56,7 @@
namespace cppa {
// forward declarations
+class self_type;
class scheduler;
class message_future;
class local_scheduler;
@@ -88,6 +90,8 @@ constexpr keep_behavior_t keep_behavior = keep_behavior_t{};
*/
class local_actor : public extend<actor>::with<memory_cached> {
+ friend class self_type;
+
typedef combined_type super;
public:
@@ -239,6 +243,12 @@ class local_actor : public extend<actor>::with<memory_cached> {
/** @cond PRIVATE */
+ inline message_id new_request_id() {
+ auto result = ++m_last_request_id;
+ m_pending_responses.push_back(result.response_id());
+ return result;
+ }
+
inline void handle_sync_timeout() {
if (m_sync_timeout_handler) m_sync_timeout_handler();
else quit(exit_reason::unhandled_sync_timeout);
@@ -257,28 +267,24 @@ class local_actor : public extend<actor>::with<memory_cached> {
inline void dequeue_response(behavior&&, message_id);
+/*
template<bool Discard, typename... Ts>
void become(behavior_policy<Discard>, Ts&&... args);
template<typename T, typename... Ts>
void become(T arg, Ts&&... args);
+*/
- inline void unbecome();
+ inline void do_unbecome();
local_actor(bool is_scheduled = false);
virtual bool initialized() const = 0;
inline bool chaining_enabled();
- inline void send_message(channel* whom, any_tuple&& what);
-
- inline void send_message(actor* whom, any_tuple&& what);
-
- inline message_id send_sync_message(const actor_ptr& whom,
- any_tuple&& what);
-
- message_id send_timed_sync_message(const actor_ptr& whom,
+ message_id send_timed_sync_message(message_priority mp,
+ const actor_ptr& whom,
const util::duration& rel_time,
any_tuple&& what);
@@ -302,12 +308,12 @@ class local_actor : public extend<actor>::with<memory_cached> {
inline detail::behavior_stack& bhvr_stack();
- protected:
-
virtual void do_become(behavior&& bhvr, bool discard_old) = 0;
inline void do_become(const behavior& bhvr, bool discard_old);
+ protected:
+
inline void remove_handler(message_id id);
void cleanup(std::uint32_t reason);
@@ -397,6 +403,7 @@ inline actor_ptr& local_actor::last_sender() {
return m_current_node->sender;
}
+/*
template<bool Discard, typename... Ts>
inline void local_actor::become(behavior_policy<Discard>, Ts&&... args) {
do_become(match_expr_convert(std::forward<Ts>(args)...), Discard);
@@ -406,42 +413,16 @@ template<typename T, typename... Ts>
inline void local_actor::become(T arg, Ts&&... args) {
do_become(match_expr_convert(arg, std::forward<Ts>(args)...), true);
}
+*/
-inline void local_actor::unbecome() {
+inline void local_actor::do_unbecome() {
m_bhvr_stack.pop_async_back();
}
inline bool local_actor::chaining_enabled() {
return m_chaining && !m_chained_actor;
}
-inline void local_actor::send_message(channel* whom, any_tuple&& what) {
- whom->enqueue(this, std::move(what));
-}
-
-inline void local_actor::send_message(actor* whom, any_tuple&& what) {
- if (chaining_enabled()) {
- if (whom->chained_enqueue(this, std::move(what))) {
- m_chained_actor.reset(whom);
- }
- }
- else whom->enqueue(this, std::move(what));
-}
-
-inline message_id local_actor::send_sync_message(const actor_ptr& whom, any_tuple&& what) {
- auto id = ++m_last_request_id;
- CPPA_REQUIRE(id.is_request());
- if (chaining_enabled()) {
- if (whom->chained_enqueue({this, id}, std::move(what))) {
- chained_actor(whom);
- }
- }
- else whom->enqueue({this, id}, std::move(what));
- auto awaited_response = id.response_id();
- m_pending_responses.push_back(awaited_response);
- return awaited_response;
-}
-
inline message_id local_actor::get_response_id() {
auto id = m_current_node->mid;
return (id.is_request()) ? id.response_id() : message_id();
View
277 cppa/logging.hpp
@@ -35,7 +35,10 @@
#include <iostream>
#include <execinfo.h>
+#include "cppa/self.hpp"
+#include "cppa/actor.hpp"
#include "cppa/singletons.hpp"
+#include "cppa/local_actor.hpp"
#include "cppa/detail/demangle.hpp"
/*
@@ -66,6 +69,7 @@ class logging {
const char* function_name,
const char* file_name,
int line_num,
+ const actor_ptr& from,
const std::string& msg ) = 0;
class trace_helper {
@@ -76,6 +80,7 @@ class logging {
const char* fun_name,
const char* file_name,
int line_num,
+ actor_ptr aptr,
const std::string& msg);
~trace_helper();
@@ -85,7 +90,8 @@ class logging {
std::string m_class;
const char* m_fun_name;
const char* m_file_name;
- int m_line_num;
+ int m_line_num;
+ actor_ptr m_self;
};
@@ -103,134 +109,221 @@ class logging {
};
+inline actor_ptr fwd_aptr(const self_type& s) {
+ return s.unchecked();
+}
+
+inline actor_ptr fwd_aptr(actor* ptr) {
+ return ptr;
+}
+
+inline actor_ptr fwd_aptr(const actor_ptr& ptr) {
+ return ptr;
+}
+
} // namespace cppa
#define CPPA_VOID_STMT static_cast<void>(0)
-#define CPPA_LIF(stmt, logstmt) if (stmt) { logstmt ; } CPPA_VOID_STMT
+#define CPPA_CAT(a,b) a ## b
+
+#define CPPA_ERROR 0
+#define CPPA_WARNING 1
+#define CPPA_INFO 2
+#define CPPA_DEBUG 3
+#define CPPA_TRACE 4
+
+#define CPPA_LVL_NAME0() "ERROR"
+#define CPPA_LVL_NAME1() "WARN "
+#define CPPA_LVL_NAME2() "INFO "
+#define CPPA_LVL_NAME3() "DEBUG"
+#define CPPA_LVL_NAME4() "TRACE"
#ifndef CPPA_LOG_LEVEL
-# define CPPA_LOG(classname, funname, level, message) { \
- std::cerr << level << " [" << classname << "::" << funname << "]: " \
- << message << "\nStack trace:\n"; \
+# define CPPA_LOG_IMPL(lvlname, classname, funname, unused, message) {\
+ std::cerr << "[" << lvlname << "] classname << "::" << funname << ": " \
+ << message << "\nStack trace:\n"; \
void *array[10]; \
size_t size = backtrace(array, 10); \
backtrace_symbols_fd(array, size, 2); \
} CPPA_VOID_STMT
+# define CPPA_LOG_LEVEL 1
#else
-# define CPPA_LOG(classname, funname, level, message) { \
- std::ostringstream scoped_oss; scoped_oss << message; \
- ::cppa::get_logger()->log( \
- level, classname, funname, __FILE__, __LINE__, scoped_oss.str()); \
- } CPPA_VOID_STMT
+# define CPPA_LOG_IMPL(lvlname, classname, funname, aptr, message) \
+ ::cppa::get_logger()->log(lvlname, classname, funname, __FILE__, \
+ __LINE__, ::cppa::fwd_aptr(aptr), \
+ (::std::ostringstream{} << message).str())
#endif
#define CPPA_CLASS_NAME ::cppa::detail::demangle(typeid(*this)).c_str()
-// errors and warnings are enabled by default
+#define CPPA_PRINT0(lvlname, classname, funname, actorptr, msg) \
+ CPPA_LOG_IMPL(lvlname, classname, funname, actorptr, msg)
-/**
- * @brief Logs a custom error message @p msg with class name @p cname
- * and function name @p fname.
- */
-#define CPPA_LOGC_ERROR(cname, fname, msg) CPPA_LOG(cname, fname, "ERROR", msg)
-#define CPPA_LOGC_WARNING(cname, fname, msg) CPPA_LOG(cname, fname, "WARN ", msg)
-#define CPPA_LOGC_INFO(cname, fname, msg) CPPA_VOID_STMT
-#define CPPA_LOGC_DEBUG(cname, fname, msg) CPPA_VOID_STMT
-#define CPPA_LOGC_TRACE(cname, fname, msg) CPPA_VOID_STMT
-
-// enable info messages
-#if CPPA_LOG_LEVEL > 1
-# undef CPPA_LOGC_INFO
-# define CPPA_LOGC_INFO(cname, fname, msg) CPPA_LOG(cname, fname, "INFO ", msg)
-# define CPPA_LOG_INFO_IF(stmt,msg) CPPA_LIF((stmt), CPPA_LOG_INFO(msg))
-# define CPPA_LOGF_INFO_IF(stmt,msg) CPPA_LIF((stmt), CPPA_LOGF_INFO(msg))
-#else
-# define CPPA_LOG_INFO_IF(unused1,unused2) CPPA_VOID_STMT
-# define CPPA_LOGF_INFO_IF(unused1,unused2) CPPA_VOID_STMT
+#define CPPA_PRINT_IF0(stmt, lvlname, classname, funname, actorptr, msg) \
+ if (stmt) { CPPA_LOG_IMPL(lvlname, classname, funname, actorptr, msg); } \
+ CPPA_VOID_STMT
+
+#define CPPA_PRINT1(lvlname, classname, funname, actorptr, msg) \
+ CPPA_PRINT0(lvlname, classname, funname, actorptr, msg)
+
+#define CPPA_PRINT_IF1(stmt, lvlname, classname, funname, actorptr, msg) \
+ CPPA_PRINT_IF0(stmt, lvlname, classname, funname, actorptr, msg)
+
+#if CPPA_LOG_LEVEL < 4
+# define CPPA_PRINT4(arg0, arg1, arg2, arg3, arg4)
+# else
+# define CPPA_PRINT4(lvlname, classname, funname, actorptr, msg) \
+ ::cppa::logging::trace_helper cppa_trace_helper_ { \
+ classname, funname, __FILE__, __LINE__, \
+ ::cppa::fwd_aptr(actorptr), \
+ (::std::ostringstream{} << msg).str() \
+ }
#endif
-// enable debug messages
-#if CPPA_LOG_LEVEL > 2
-# undef CPPA_LOGC_DEBUG
-# define CPPA_LOGC_DEBUG(cname, fname, msg) CPPA_LOG(cname, fname, "DEBUG", msg)
-# define CPPA_LOG_DEBUG_IF(stmt,msg) CPPA_LIF((stmt), CPPA_LOG_DEBUG(msg))
-# define CPPA_LOGF_DEBUG_IF(stmt,msg) CPPA_LIF((stmt), CPPA_LOGF_DEBUG(msg))
-#else
-# define CPPA_LOG_DEBUG_IF(unused1,unused2) CPPA_VOID_STMT
-# define CPPA_LOGF_DEBUG_IF(unused1,unused2) CPPA_VOID_STMT
+#if CPPA_LOG_LEVEL < 3
+# define CPPA_PRINT3(arg0, arg1, arg2, arg3, arg4)
+# define CPPA_PRINT_IF3(arg0, arg1, arg2, arg3, arg4, arg5)
+# else
+# define CPPA_PRINT3(lvlname, classname, funname, actorptr, msg) \
+ CPPA_PRINT0(lvlname, classname, funname, actorptr, msg)
+# define CPPA_PRINT_IF3(stmt, lvlname, classname, funname, actorptr, msg)\
+ CPPA_PRINT_IF0(stmt, lvlname, classname, funname, actorptr, msg)
#endif
-// enable trace messages
-#if CPPA_LOG_LEVEL > 3
-# undef CPPA_LOGC_TRACE
-# define CPPA_CONCAT_I(lhs,rhs) lhs ## rhs
-# define CPPA_CONCAT(lhs,rhs) CPPA_CONCAT_I(lhs,rhs)
-# define CPPA_CONCATL(lhs) CPPA_CONCAT(lhs, __LINE__)
-# define CPPA_LOGC_TRACE(cname, fname, msg) \
- ::std::ostringstream CPPA_CONCATL(cppa_trace_helper_) ; \
- CPPA_CONCATL(cppa_trace_helper_) << msg ; \
- ::cppa::logging::trace_helper CPPA_CONCATL(cppa_fun_trace_helper_) { \
- cname, fname , __FILE__ , __LINE__ , \
- CPPA_CONCATL(cppa_trace_helper_) .str() }
+#if CPPA_LOG_LEVEL < 2
+# define CPPA_PRINT2(arg0, arg1, arg2, arg3, arg4)
+# define CPPA_PRINT_IF2(arg0, arg1, arg2, arg3, arg4, arg5)
+# else
+# define CPPA_PRINT2(lvlname, classname, funname, actorptr, msg) \
+ CPPA_PRINT0(lvlname, classname, funname, actorptr, msg)
+# define CPPA_PRINT_IF2(stmt, lvlname, classname, funname, actorptr, msg)\
+ CPPA_PRINT_IF0(stmt, lvlname, classname, funname, actorptr, msg)
#endif
+#define CPPA_EVAL(what) what
+
/**
- * @brief Logs @p msg with custom member function @p fname.
- */
-#define CPPA_LOGS_ERROR(fname, msg) CPPA_LOGC_ERROR(CPPA_CLASS_NAME, fname, msg)
+ * @def CPPA_LOGC
+ * @brief Logs a message with custom class and function names.
+ **/
+#define CPPA_LOGC(level, classname, funname, actorptr, msg) \
+ CPPA_CAT(CPPA_PRINT, level)(CPPA_CAT(CPPA_LVL_NAME, level)(), classname, \
+ funname, actorptr, msg)
/**
- * @brief Logs @p msg with custom class name @p cname.
- */
-#define CPPA_LOGM_ERROR(cname, msg) CPPA_LOGC_ERROR(cname, __FUNCTION__, msg)
+ * @def CPPA_LOGF
+ * @brief Logs a message inside a free function.
+ **/
+#define CPPA_LOGF(level, actorptr, msg) \
+ CPPA_LOGC(level, "NONE", __func__, actorptr, msg)
/**
- * @brief Logs @p msg in a free function if @p stmt evaluates to @p true.
- */
-#define CPPA_LOGF_ERROR_IF(stmt, msg) CPPA_LIF((stmt), CPPA_LOGF_ERROR(msg))
+ * @def CPPA_LOGMF
+ * @brief Logs a message inside a member function.
+ **/
+#define CPPA_LOGMF(level, actorptr, msg) \
+ CPPA_LOGC(level, CPPA_CLASS_NAME, __func__, actorptr, msg)
/**
- * @brief Logs @p msg in a free function.
- */
-#define CPPA_LOGF_ERROR(msg) CPPA_LOGM_ERROR("NONE", msg)
+ * @def CPPA_LOGC
+ * @brief Logs a message with custom class and function names.
+ **/
+#define CPPA_LOGC_IF(stmt, level, classname, funname, actorptr, msg) \
+ CPPA_CAT(CPPA_PRINT_IF, level)(stmt, CPPA_CAT(CPPA_LVL_NAME, level)(), \
+ classname, funname, actorptr, msg)
/**
- * @brief Logs @p msg in a member function if @p stmt evaluates to @p true.
- */
-#define CPPA_LOG_ERROR_IF(stmt, msg) CPPA_LIF((stmt), CPPA_LOG_ERROR(msg))
+ * @def CPPA_LOGF
+ * @brief Logs a message inside a free function.
+ **/
+#define CPPA_LOGF_IF(stmt, level, actorptr, msg) \
+ CPPA_LOGC_IF(stmt, level, "NONE", __func__, actorptr, msg)
/**
- * @brief Logs @p msg in a member function.
- */
-#define CPPA_LOG_ERROR(msg) CPPA_LOGM_ERROR(CPPA_CLASS_NAME, msg)
-
-// convenience macros for warnings
-#define CPPA_LOG_WARNING(msg) CPPA_LOGM_WARNING(CPPA_CLASS_NAME, msg)
-#define CPPA_LOGF_WARNING(msg) CPPA_LOGM_WARNING("NONE", msg)
-#define CPPA_LOGM_WARNING(cname, msg) CPPA_LOGC_WARNING(cname, __FUNCTION__, msg)
-#define CPPA_LOG_WARNING_IF(stmt,msg) CPPA_LIF((stmt), CPPA_LOG_WARNING(msg))
-#define CPPA_LOGF_WARNING_IF(stmt,msg) CPPA_LIF((stmt), CPPA_LOGF_WARNING(msg))
-
-// convenience macros for info messages
-#define CPPA_LOG_INFO(msg) CPPA_LOGM_INFO(CPPA_CLASS_NAME, msg)
-#define CPPA_LOGF_INFO(msg) CPPA_LOGM_INFO("NONE", msg)
-#define CPPA_LOGM_INFO(cname, msg) CPPA_LOGC_INFO(cname, __FUNCTION__, msg)
-
-// convenience macros for debug messages
-#define CPPA_LOG_DEBUG(msg) CPPA_LOGM_DEBUG(CPPA_CLASS_NAME, msg)
-#define CPPA_LOGF_DEBUG(msg) CPPA_LOGM_DEBUG("NONE", msg)
-#define CPPA_LOGM_DEBUG(cname, msg) CPPA_LOGC_DEBUG(cname, __FUNCTION__, msg)
-
-// convenience macros for trace messages
-#define CPPA_LOGS_TRACE(fname, msg) CPPA_LOGC_TRACE(CPPA_CLASS_NAME, fname, msg)
-#define CPPA_LOGM_TRACE(cname, msg) CPPA_LOGC_TRACE(cname, __FUNCTION__, msg)
-#define CPPA_LOGF_TRACE(msg) CPPA_LOGM_TRACE("NONE", msg)
-#define CPPA_LOG_TRACE(msg) CPPA_LOGM_TRACE(CPPA_CLASS_NAME, msg)
+ * @def CPPA_LOGMF
+ * @brief Logs a message inside a member function.
+ **/
+#define CPPA_LOGMF_IF(stmt, level, actorptr, msg) \
+ CPPA_LOGC_IF(stmt, level, CPPA_CLASS_NAME, __func__, actorptr, msg)
// convenience macros to safe some typing when printing arguments
#define CPPA_ARG(arg) #arg << " = " << arg
#define CPPA_TARG(arg, trans) #arg << " = " << trans ( arg )
#define CPPA_MARG(arg, memfun) #arg << " = " << arg . memfun ()
+#define CPPA_TTARG(arg) #arg << " = " << to_string ( arg )
+
+
+/******************************************************************************
+ * backward compatibility for version <= 0.6 *
+ ******************************************************************************/
+
+#define CPPA_LOG_ERROR(msg) CPPA_LOGMF(CPPA_ERROR, ::cppa::self, msg)
+#define CPPA_LOG_WARNING(msg) CPPA_LOGMF(CPPA_WARNING, ::cppa::self, msg)
+#define CPPA_LOG_DEBUG(msg) CPPA_LOGMF(CPPA_DEBUG, ::cppa::self, msg)
+#define CPPA_LOG_INFO(msg) CPPA_LOGMF(CPPA_INFO, ::cppa::self, msg)
+#define CPPA_LOG_TRACE(msg) CPPA_LOGMF(CPPA_TRACE, ::cppa::self, msg)
+
+#define CPPA_LOG_ERROR_IF(stmt, msg) CPPA_LOGMF_IF(stmt, CPPA_ERROR, ::cppa::self, msg)
+#define CPPA_LOG_WARNING_IF(stmt, msg) CPPA_LOGMF_IF(stmt, CPPA_WARNING, ::cppa::self, msg)
+#define CPPA_LOG_DEBUG_IF(stmt, msg) CPPA_LOGMF_IF(stmt, CPPA_DEBUG, ::cppa::self, msg)
+#define CPPA_LOG_INFO_IF(stmt, msg) CPPA_LOGMF_IF(stmt, CPPA_INFO, ::cppa::self, msg)
+#define CPPA_LOG_TRACE_IF(stmt, msg) CPPA_LOGMF_IF(stmt, CPPA_TRACE, ::cppa::self, msg)
+
+#define CPPA_LOGC_ERROR(cname, fname, msg) \
+ CPPA_LOGC(CPPA_ERROR, cname, fname, ::cppa::self, msg)
+#define CPPA_LOGC_WARNING(cname, fname, msg) \
+ CPPA_LOGC(CPPA_WARNING, cname, fname, ::cppa::self, msg)
+#define CPPA_LOGC_DEBUG(cname, fname, msg) \
+ CPPA_LOGC(CPPA_DEBUG, cname, fname, ::cppa::self, msg)
+#define CPPA_LOGC_INFO(cname, fname, msg) \
+ CPPA_LOGC(CPPA_INFO, cname, fname, ::cppa::self, msg)
+#define CPPA_LOGC_TRACE(cname, fname, msg) \
+ CPPA_LOGC(CPPA_TRACE, cname, fname, ::cppa::self, msg)
+
+#define CPPA_LOGC_ERROR_IF(stmt, cname, fname, msg) \
+ CPPA_LOGC_IF(stmt, CPPA_ERROR, cname, fname, ::cppa::self, msg)
+#define CPPA_LOGC_WARNING_IF(stmt, cname, fname, msg) \
+ CPPA_LOGC_IF(stmt, CPPA_WARNING, cname, fname, ::cppa::self, msg)
+#define CPPA_LOGC_DEBUG_IF(stmt, cname, fname, msg) \
+ CPPA_LOGC_IF(stmt, CPPA_DEBUG, cname, fname, ::cppa::self, msg)
+#define CPPA_LOGC_INFO_IF(stmt, cname, fname, msg) \
+ CPPA_LOGC_IF(stmt, CPPA_INFO, cname, fname, ::cppa::self, msg)
+#define CPPA_LOGC_TRACE_IF(stmt, cname, fname, msg) \
+ CPPA_LOGC_IF(stmt, CPPA_TRACE, cname, fname, ::cppa::self, msg)
+
+#define CPPA_LOGF_ERROR(msg) CPPA_LOGF(CPPA_ERROR, ::cppa::self, msg)
+#define CPPA_LOGF_WARNING(msg) CPPA_LOGF(CPPA_WARNING, ::cppa::self, msg)
+#define CPPA_LOGF_DEBUG(msg) CPPA_LOGF(CPPA_DEBUG, ::cppa::self, msg)
+#define CPPA_LOGF_INFO(msg) CPPA_LOGF(CPPA_INFO, ::cppa::self, msg)
+#define CPPA_LOGF_TRACE(msg) CPPA_LOGF(CPPA_TRACE, ::cppa::self, msg)
+
+#define CPPA_LOGF_ERROR_IF(stmt, msg) CPPA_LOGF_IF(stmt, CPPA_ERROR, ::cppa::self, msg)
+#define CPPA_LOGF_WARNING_IF(stmt, msg) CPPA_LOGF_IF(stmt, CPPA_WARNING, ::cppa::self, msg)
+#define CPPA_LOGF_DEBUG_IF(stmt, msg) CPPA_LOGF_IF(stmt, CPPA_DEBUG, ::cppa::self, msg)
+#define CPPA_LOGF_INFO_IF(stmt, msg) CPPA_LOGF_IF(stmt, CPPA_INFO, ::cppa::self, msg)
+#define CPPA_LOGF_TRACE_IF(stmt, msg) CPPA_LOGF_IF(stmt, CPPA_TRACE, ::cppa::self, msg)
+
+#define CPPA_LOGM_ERROR(cname, msg) \
+ CPPA_LOGC(CPPA_ERROR, cname, __func__, ::cppa::self, msg)
+#define CPPA_LOGM_WARNING(cname, msg) \
+ CPPA_LOGC(CPPA_WARNING, cname, ::cppa::self, msg)
+#define CPPA_LOGM_DEBUG(cname, msg) \
+ CPPA_LOGC(CPPA_DEBUG, cname, __func__, ::cppa::self, msg)
+#define CPPA_LOGM_INFO(cname, msg) \
+ CPPA_LOGC(CPPA_INFO, cname, ::cppa::self, msg)
+#define CPPA_LOGM_TRACE(cname, msg) \
+ CPPA_LOGC(CPPA_TRACE, cname, __func__, ::cppa::self, msg)
+
+#define CPPA_LOGM_ERROR_IF(stmt, cname, msg) \
+ CPPA_LOGC_IF(stmt, CPPA_ERROR, cname, __func__, ::cppa::self, msg)
+#define CPPA_LOGM_WARNING_IF(stmt, cname, msg) \
+ CPPA_LOGC_IF(stmt, CPPA_WARNING, cname, ::cppa::self, msg)
+#define CPPA_LOGM_DEBUG_IF(stmt, cname, msg) \
+ CPPA_LOGC_IF(stmt, CPPA_DEBUG, cname, __func__, ::cppa::self, msg)
+#define CPPA_LOGM_INFO_IF(stmt, cname, msg) \
+ CPPA_LOGC_IF(stmt, CPPA_INFO, cname, ::cppa::self, msg)
+#define CPPA_LOGM_TRACE_IF(stmt, cname, msg) \
+ CPPA_LOGC_IF(stmt, CPPA_TRACE, cname, __func__, ::cppa::self, msg)
#endif // CPPA_LOGGING_HPP
View
5 cppa/mailbox_based.hpp
@@ -39,9 +39,6 @@
namespace cppa {
-template<typename T>
-struct has_blocking_receive;
-
template<class Base, class Subtype>
class mailbox_based : public Base {
@@ -65,7 +62,7 @@ class mailbox_based : public Base {
template<typename... Ts>
mailbox_based(Ts&&... args) : Base(std::forward<Ts>(args)...) { }
- void cleanup(std::uint32_t reason) {
+ virtual void cleanup(std::uint32_t reason) override {
detail::sync_request_bouncer f{reason};
m_mailbox.close(f);
Base::cleanup(reason);
View
17 cppa/memory_cached.hpp
@@ -31,6 +31,9 @@
#ifndef CPPA_MEMORY_CACHED_HPP
#define CPPA_MEMORY_CACHED_HPP
+#include <utility>
+#include <type_traits>
+
#include "cppa/detail/memory.hpp"
namespace cppa {
@@ -47,6 +50,10 @@ class memory_cached : public Base {
template<typename>
friend class detail::basic_memory_cache;
+ public:
+
+ static constexpr bool is_memory_cached_type = true;
+
protected:
typedef memory_cached combined_type;
@@ -74,6 +81,16 @@ class memory_cached : public Base {
};
+template<typename T>
+struct is_memory_cached {
+ template<class U, bool = U::is_memory_cached_type>
+ static std::true_type check(int);
+ template<class>
+ static std::false_type check(...);
+public:
+ static constexpr bool value = decltype(check<T>(0))::value;
+};
+
} // namespace cppa
#endif // CPPA_MEMORY_CACHED_HPP
View
2 cppa/message_future.hpp
@@ -68,7 +68,7 @@ class message_future {
behavior cpy = ref;
ref = cpy.add_continuation(std::move(fun));
}
- else CPPA_LOG_WARNING(".continue_with: failed to add continuation");
+ else CPPA_LOG_ERROR(".continue_with: failed to add continuation");
}
private:
View
77 cppa/message_header.hpp
@@ -31,6 +31,7 @@
#ifndef CPPA_MESSAGE_HEADER_HPP
#define CPPA_MESSAGE_HEADER_HPP
+#include "cppa/self.hpp"
#include "cppa/actor.hpp"
#include "cppa/message_id.hpp"
#include "cppa/message_priority.hpp"
@@ -47,28 +48,70 @@ class message_header {
public:
- actor_ptr sender;
- actor_ptr receiver;
+ actor_ptr sender;
+ channel_ptr receiver;
message_id id;
message_priority priority;
+ /**
+ * @brief An invalid message header without receiver or sender;
+ **/
message_header() = default;
- message_header(actor* sender);
-
- message_header(const self_type& sender);
-
- message_header(const actor_ptr& sender,
- message_priority priority = message_priority::normal);
-
- message_header(const actor_ptr &sender,
- message_id id,
- message_priority priority = message_priority::normal);
-
- message_header(const actor_ptr& sender,
- const actor_ptr& receiver,
- message_id id = message_id::invalid,
- message_priority priority = message_priority::normal);
+ /**
+ * @brief Creates a message header with <tt>receiver = dest</tt>
+ * and <tt>sender = self</tt>.
+ **/
+ template<typename T>
+ message_header(intrusive_ptr<T> dest)
+ : sender(self), receiver(dest), priority(message_priority::normal) {
+ static_assert(std::is_convertible<T*,channel*>::value,
+ "illegal receiver");
+ }
+
+ template<typename T>
+ message_header(T* dest)
+ : sender(self), receiver(dest), priority(message_priority::normal) {
+ static_assert(std::is_convertible<T*,channel*>::value,
+ "illegal receiver");
+ }
+
+ message_header(const std::nullptr_t&);
+
+ /**
+ * @brief Creates a message header with <tt>receiver = self</tt>
+ * and <tt>sender = self</tt>.
+ **/
+ message_header(const self_type&);
+
+ /**
+ * @brief Creates a message header with <tt>receiver = dest</tt>
+ * and <tt>sender = self</tt>.
+ */
+ message_header(channel_ptr dest,
+ message_id mid,
+ message_priority prio = message_priority::normal);
+
+ /**
+ * @brief Creates a message header with <tt>receiver = dest</tt> and
+ * <tt>sender = self</tt>.
+ */
+ message_header(channel_ptr dest, message_priority prio);
+
+ /**
+ * @brief Creates a message header with <tt>receiver = dest</tt> and
+ * <tt>sender = source</tt>.
+ */
+ message_header(actor_ptr source,
+ channel_ptr dest,
+ message_id mid = message_id::invalid,
+ message_priority prio = message_priority::normal);
+
+ /**
+ * @brief Creates a message header with <tt>receiver = dest</tt> and
+ * <tt>sender = self</tt>.
+ */
+ message_header(actor_ptr source, channel_ptr dest, message_priority prio);
void deliver(any_tuple msg) const;
View
10 cppa/network/middleman_event_handler_base.hpp
@@ -107,7 +107,7 @@ class middleman_event_handler_base {
auto wptr = ptr->as_io();
if (wptr) fd = wptr->write_handle();
else {
- CPPA_LOG_ERROR("ptr->downcast() returned nullptr");
+ CPPA_LOGMF(CPPA_ERROR, self, "ptr->downcast() returned nullptr");
return;
}
break;
@@ -118,21 +118,21 @@ class middleman_event_handler_base {
if (wptr) {
auto wrfd = wptr->write_handle();
if (fd != wrfd) {
- CPPA_LOG_DEBUG("read_handle != write_handle, split "
+ CPPA_LOGMF(CPPA_DEBUG, self, "read_handle != write_handle, split "
"into two function calls");
// split into two function calls
e = event::read;
alteration(ptr, event::write, etype);
}
}
else {
- CPPA_LOG_ERROR("ptr->downcast() returned nullptr");
+ CPPA_LOGMF(CPPA_ERROR, self, "ptr->downcast() returned nullptr");
return;
}
break;
}
default:
- CPPA_LOG_ERROR("invalid bitmask");
+ CPPA_LOGMF(CPPA_ERROR, self, "invalid bitmask");
return;
}
m_alterations.emplace_back(fd_meta_info(fd, ptr, e), etype);
@@ -163,7 +163,7 @@ class middleman_event_handler_base {
if (iter != last) old = iter->mask;
auto mask = next_bitmask(old, elem.mask, elem_pair.second);
auto ptr = elem.ptr.get();
- CPPA_LOG_DEBUG("new bitmask for "
+ CPPA_LOGMF(CPPA_DEBUG, self, "new bitmask for "
<< elem.ptr.get() << ": " << eb2str(mask));
if (iter == last || iter->fd != elem.fd) {
CPPA_LOG_INFO_IF(mask == event::none,
View
12 cppa/opencl/actor_facade.hpp
@@ -82,15 +82,15 @@ class actor_facade<Ret(Args...)> : public actor {
result_mapping map_result) {
if (global_dims.empty()) {
auto str = "OpenCL kernel needs at least 1 global dimension.";
- CPPA_LOGM_ERROR(detail::demangle(typeid(actor_facade)), str);
+ CPPA_LOGM_ERROR(detail::demangle(typeid(actor_facade)).c_str(), str);
throw std::runtime_error(str);
}
auto check_vec = [&](const dim_vec& vec, const char* name) {
if (!vec.empty() && vec.size() != global_dims.size()) {
std::ostringstream oss;
oss << name << " vector is not empty, but "
<< "its size differs from global dimensions vector's size";
- CPPA_LOGM_ERROR(detail::demangle<actor_facade>(), oss.str());
+ CPPA_LOGM_ERROR(detail::demangle<actor_facade>().c_str(), oss.str());
throw std::runtime_error(oss.str());
}
};
@@ -104,7 +104,7 @@ class actor_facade<Ret(Args...)> : public actor {
if (err != CL_SUCCESS) {
std::ostringstream oss;
oss << "clCreateKernel: " << get_opencl_error(err);
- CPPA_LOGM_ERROR(detail::demangle<actor_facade>(), oss.str());
+ CPPA_LOGM_ERROR(detail::demangle<actor_facade>().c_str(), oss.str());
throw std::runtime_error(oss.str());
}
return new actor_facade<Ret (Args...)>{dispatcher,
@@ -169,7 +169,7 @@ class actor_facade<Ret(Args...)> : public actor {
m_local_dimensions,
m_map_result));
}
- else { CPPA_LOG_ERROR("actor_facade::enqueue() tuple_cast failed."); }
+ else { CPPA_LOGMF(CPPA_ERROR, this, "actor_facade::enqueue() tuple_cast failed."); }
}
typedef std::vector<mem_ptr> args_vec;
@@ -217,7 +217,7 @@ class actor_facade<Ret(Args...)> : public actor {
arg0.data(),
&err);
if (err != CL_SUCCESS) {
- CPPA_LOG_ERROR("clCreateBuffer: " << get_opencl_error(err));
+ CPPA_LOGMF(CPPA_ERROR, this, "clCreateBuffer: " << get_opencl_error(err));
}
else {
mem_ptr tmp;
@@ -241,7 +241,7 @@ class actor_facade<Ret(Args...)> : public actor {
nullptr,
&err);
if (err != CL_SUCCESS) {
- CPPA_LOG_ERROR("clCreateBuffer: " << get_opencl_error(err));
+ CPPA_LOGMF(CPPA_ERROR, this, "clCreateBuffer: " << get_opencl_error(err));
}
else {
mem_ptr tmp;
View
53 cppa/prioritizing.hpp
@@ -31,4 +31,57 @@
#ifndef PRIORITIZING_HPP
#define PRIORITIZING_HPP
+#include <iostream>
+
+#include "cppa/mailbox_element.hpp"
+#include "cppa/message_priority.hpp"
+#include "cppa/detail/sync_request_bouncer.hpp"
+
+namespace cppa {
+
+template<class Base, class Subtype>
+class prioritizing : public Base {
+
+ public:
+
+ virtual mailbox_element* try_pop() override {
+ auto result = m_high_priority_mailbox.try_pop();
+ return (result) ? result : this->m_mailbox.try_pop();
+ }
+
+ template<typename... Ts>
+ prioritizing(Ts&&... args) : Base(std::forward<Ts>(args)...) { }
+
+ protected:
+
+ typedef prioritizing combined_type;
+
+ virtual void cleanup(std::uint32_t reason) override {
+ detail::sync_request_bouncer f{reason};
+ m_high_priority_mailbox.close(f);
+ Base::cleanup(reason);
+ }
+
+ virtual bool mailbox_empty() override {
+ return m_high_priority_mailbox.empty()
+ && this->m_mailbox.empty();
+ }
+
+ virtual void enqueue(const message_header& hdr, any_tuple msg) override {
+ typename Base::mailbox_type* mbox = nullptr;
+ if (hdr.priority == message_priority::high) {
+ mbox = &m_high_priority_mailbox;
+ }
+ else {
+ mbox = &this->m_mailbox;
+ }
+ this->enqueue_impl(*mbox, hdr, std::move(msg));
+ }
+
+ typename Base::mailbox_type m_high_priority_mailbox;
+
+};
+
+} // namespace cppa
+
#endif // PRIORITIZING_HPP
View
19 cppa/process_information.hpp
@@ -121,11 +121,23 @@ inline bool equal(const process_information::node_id_type& node_id,
}
/**
+ * @brief A smart pointer type that manages instances of
+ * {@link process_information}.
+ * @relates process_information
+ */
+typedef intrusive_ptr<process_information> process_information_ptr;
+
+/**
* @relates process_information
*/
std::string to_string(const process_information& what);
/**
+ * @relates process_information
+ */
+std::string to_string(const process_information_ptr& what);
+
+/**
* @brief Converts a {@link process_information::node_id_type node_id}
* to a hexadecimal string.
* @param node_id A unique node identifier.
@@ -134,13 +146,6 @@ std::string to_string(const process_information& what);
*/
std::string to_string(const process_information::node_id_type& node_id);
-/**
- * @brief A smart pointer type that manages instances of
- * {@link process_information}.
- * @relates process_information
- */
-typedef intrusive_ptr<process_information> process_information_ptr;
-
} // namespace cppa
#endif // CPPA_PROCESS_INFORMATION_HPP
View
5 cppa/sb_actor.hpp
@@ -34,6 +34,7 @@
#include <utility>
#include <type_traits>
+#include "cppa/util/dptr.hpp"
#include "cppa/event_based_actor.hpp"
namespace cppa {
@@ -60,8 +61,8 @@ class sb_actor : public Base {
* @brief Overrides {@link event_based_actor::init()} and sets
* the initial actor behavior to <tt>Derived::init_state</tt>.
*/
- virtual void init() {
- this->become(static_cast<Derived*>(this)->init_state);
+ virtual void init() override {
+ become(util::dptr<Derived>(this)->init_state);
}
protected:
View
3 cppa/scheduled_actor.hpp
@@ -64,9 +64,6 @@ enum scheduled_actor_type {
class scheduled_actor;
-template<>
-struct has_blocking_receive<scheduled_actor> : std::true_type { };
-
/**
* @brief A base class for cooperatively scheduled actors.
* @extends local_actor
View
6 cppa/scheduler.hpp
@@ -138,7 +138,8 @@ class scheduler {
util::duration{rel_time},
to,
std::move(data));
- delayed_send_helper()->enqueue(self, std::move(tup));
+ auto dsh = delayed_send_helper();
+ dsh->enqueue({self, dsh}, std::move(tup));
}
template<typename Duration, typename... Data>
@@ -153,7 +154,8 @@ class scheduler {
to,
id,
std::move(data));
- delayed_send_helper()->enqueue(self, std::move(tup));
+ auto dsh = delayed_send_helper();
+ dsh->enqueue({self, dsh}, std::move(tup));
}
else {
this->delayed_send(to, rel_time, std::move(data));
View
285 cppa/send.hpp
@@ -0,0 +1,285 @@
+/******************************************************************************\
+ * ___ __ *
+ * /\_ \ __/\ \ *
+ * \//\ \ /\_\ \ \____ ___ _____ _____ __ *
+ * \ \ \ \/\ \ \ '__`\ /'___\/\ '__`\/\ '__`\ /'__`\ *
+ * \_\ \_\ \ \ \ \L\ \/\ \__/\ \ \L\ \ \ \L\ \/\ \L\.\_ *
+ * /\____\\ \_\ \_,__/\ \____\\ \ ,__/\ \ ,__/\ \__/.\_\ *
+ * \/____/ \/_/\/___/ \/____/ \ \ \/ \ \ \/ \/__/\/_/ *
+ * \ \_\ \ \_\ *
+ * \/_/ \/_/ *
+ * *
+ * Copyright (C) 2011-2013 *
+ * Dominik Charousset <dominik.charousset@haw-hamburg.de> *
+ * *
+ * This file is part of libcppa. *
+ * libcppa is free software: you can redistribute it and/or modify it under *
+ * the terms of the GNU Lesser General Public License as published by the *
+ * Free Software Foundation; either version 2.1 of the License, *
+ * or (at your option) any later version. *
+ * *
+ * libcppa is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *
+ * See the GNU Lesser General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU Lesser General Public License *
+ * along with libcppa. If not, see <http://www.gnu.org/licenses/>. *
+\******************************************************************************/
+
+
+#ifndef CPPA_SEND_HPP
+#define CPPA_SEND_HPP
+
+#include "cppa/self.hpp"
+#include "cppa/actor.hpp"
+#include "cppa/any_tuple.hpp"
+#include "cppa/scheduler.hpp"
+#include "cppa/local_actor.hpp"
+#include "cppa/message_header.hpp"
+#include "cppa/message_future.hpp"
+
+namespace cppa {
+
+/**
+ * @ingroup MessageHandling
+ * @{
+ */
+
+/**
+ * @brief Sends @p what to the receiver specified in @p hdr.
+ */
+inline void send_tuple(const message_header& hdr, any_tuple what) {
+ if (hdr.receiver == nullptr) return;
+ local_actor* sptr = self.unchecked();
+ if (sptr && sptr->chaining_enabled()) {
+ if (hdr.receiver->chained_enqueue(hdr, std::move(what))) {
+ // only actors implement chained_enqueue to return true
+ sptr->chained_actor(static_cast<actor*>(hdr.receiver.get()));
+ }
+ }
+ else hdr.receiver->enqueue(hdr, std::move(what));
+}
+
+/**
+ * @brief Sends <tt>{what...}</tt> to the receiver specified in @p hdr.
+ * @pre <tt>sizeof...(Ts) > 0</tt>
+ */
+template<typename... Ts>
+inline void send(const message_header& hdr, Ts&&... what) {
+ static_assert(sizeof...(Ts) > 0, "no message to send");
+ send_tuple(hdr, make_any_tuple(std::forward<Ts>(what)...));
+}
+
+/**
+ * @brief Sends @p what to @p whom, but sets the sender information to @p from.
+ */
+inline void send_tuple_as(actor_ptr from, channel_ptr whom, any_tuple what) {
+ send_tuple({std::move(from), std::move(whom)}, std::move(what));
+}
+
+/**
+ * @brief Sends <tt>{what...}</tt> as a message to @p whom, but sets
+ * the sender information to @p from.
+ * @param from Sender as seen by @p whom.
+ * @param whom Receiver of the message.
+ * @param what Message elements.
+ * @pre <tt>sizeof...(Ts) > 0</tt>
+ */
+template<typename... Ts>
+inline void send_as(actor_ptr from, channel_ptr whom, Ts&&... what) {
+ send_tuple({std::move(from), std::move(whom)},
+ make_any_tuple(std::forward<Ts>(what)...));
+}
+
+/**
+ * @brief Sends @p what as a synchronous message to @p whom.
+ * @param whom Receiver of the message.
+ * @param what Message content as tuple.
+ * @returns A handle identifying a future to the response of @p whom.
+ * @warning The returned handle is actor specific and the response to the sent
+ * message cannot be received by another actor.
+ * @throws std::invalid_argument if <tt>whom == nullptr</tt>
+ */
+inline message_future sync_send_tuple(actor_ptr whom, any_tuple what) {
+ if (!whom) throw std::invalid_argument("whom == nullptr");
+ auto req = self->new_request_id();
+ send_tuple({std::move(whom), req}, std::move(what));
+ return req.response_id();
+}
+
+/**
+ * @brief Sends <tt>{what...}</tt> as a synchronous message to @p whom.
+ * @param whom Receiver of the message.
+ * @param what Message elements.
+ * @returns A handle identifying a future to the response of @p whom.
+ * @warning The returned handle is actor specific and the response to the sent
+ * message cannot be received by another actor.
+ * @pre <tt>sizeof...(Ts) > 0</tt>
+ * @throws std::invalid_argument if <tt>whom == nullptr</tt>
+ */
+template<typename... Ts>
+inline message_future sync_send(actor_ptr whom, Ts&&... what) {
+ static_assert(sizeof...(Ts) > 0, "no message to send");
+ return sync_send_tuple(std::move(whom),
+ make_any_tuple(std::forward<Ts>(what)...));
+}
+
+/**
+ * @brief Sends @p what as a synchronous message to @p whom with a timeout.
+ *
+ * The calling actor receives a 'TIMEOUT' message as response after
+ * given timeout exceeded and no response messages was received.
+ * @param whom Receiver of the message.
+ * @param what Message content as tuple.
+ * @returns A handle identifying a future to the response of @p whom.
+ * @warning The returned handle is actor specific and the response to the sent
+ * message cannot be received by another actor.
+ * @throws std::invalid_argument if <tt>whom == nullptr</tt>
+ */
+template<class Rep, class Period, typename... Ts>
+message_future timed_sync_send_tuple(actor_ptr whom,
+ const std::chrono::duration<Rep,Period>& rel_time,
+ any_tuple what) {
+ auto mf = sync_send_tuple(std::move(whom), std::move(what));
+ auto tmp = make_any_tuple(atom("TIMEOUT"));
+ get_scheduler()->delayed_reply(self, rel_time, mf.id(), std::move(tmp));
+ return mf;
+}
+
+/**
+ * @brief Sends <tt>{what...}</tt> as a synchronous message to @p whom
+ * with a timeout.
+ *
+ * The calling actor receives a 'TIMEOUT' message as response after
+ * given timeout exceeded and no response messages was received.
+ * @param whom Receiver of the message.
+ * @param what Message elements.
+ * @returns A handle identifying a future to the response of @p whom.
+ * @warning The returned handle is actor specific and the response to the sent
+ * message cannot be received by another actor.
+ * @pre <tt>sizeof...(Ts) > 0</tt>
+ * @throws std::invalid_argument if <tt>whom == nullptr</tt>
+ */
+template<class Rep, class Period, typename... Ts>
+message_future timed_sync_send(actor_ptr whom,
+ const std::chrono::duration<Rep,Period>& rel_time,
+ Ts&&... what) {
+ static_assert(sizeof...(Ts) > 0, "no message to send");
+ return timed_sync_send_tuple(std::move(whom),
+ rel_time,
+ make_any_tuple(std::forward<Ts>(what)...));
+}
+
+/**
+ * @brief Sends a message to the sender of the last received message.
+ * @param what Message content as a tuple.
+ */
+inline void reply_tuple(any_tuple what) {
+ self->reply_message(std::move(what));
+}
+
+/**
+ * @brief Sends a message to the sender of the last received message.
+ * @param what Message elements.
+ */
+template<typename... Ts>
+inline void reply(Ts&&... what) {
+ self->reply_message(make_any_tuple(std::forward<Ts>(what)...));
+}
+
+/**
+ * @brief Sends a message as reply to @p handle.
+ */
+template<typename... Ts>
+inline void reply_to(const response_handle& handle, Ts&&... what) {
+ if (handle.valid()) {
+ handle.apply(make_any_tuple(std::forward<Ts>(what)...));
+ }
+}
+
+/**
+ * @brief Replies with @p what to @p handle.
+ * @param handle Identifies a previously received request.
+ * @param what Response message.
+ */
+inline void reply_tuple_to(const response_handle& handle, any_tuple what) {
+ handle.apply(std::move(what));
+}
+
+/**
+ * @brief Forwards the last received message to @p whom.
+ */
+inline void forward_to(const actor_ptr& whom) {
+ self->forward_message(whom);
+}
+
+/**
+ * @brief Sends a message to @p whom that is delayed by @p rel_time.
+ * @param whom Receiver of the message.
+ * @param rtime Relative time duration to delay the message in
+ * microseconds, milliseconds, seconds or minutes.
+ * @param what Message content as a tuple.
+ */
+template<class Rep, class Period, typename... Ts>
+inline void delayed_send_tuple(const channel_ptr& whom,
+ const std::chrono::duration<Rep,Period>& rtime,
+ any_tuple what) {
+ if (whom) get_scheduler()->delayed_send(whom, rtime, what);
+}
+
+/**
+ * @brief Sends a message to @p whom that is delayed by @p rel_time.
+ * @param whom Receiver of the message.
+ * @param rtime Relative time duration to delay the message in
+ * microseconds, milliseconds, seconds or minutes.
+ * @param what Message elements.
+ */
+template<class Rep, class Period, typename... Ts>
+inline void delayed_send(const channel_ptr& whom,
+ const std::chrono::duration<Rep,Period>& rtime,
+ Ts&&... what) {
+ static_assert(sizeof...(Ts) > 0, "no message to send");
+ if (whom) {
+ delayed_send_tuple(whom,
+ rtime,
+ make_any_tuple(std::forward<Ts>(what)...));
+ }
+}
+
+/**
+ * @brief Sends a reply message that is delayed by @p rel_time.
+ * @param rtime Relative time duration to delay the message in
+ * microseconds, milliseconds, seconds or minutes.
+ * @param what Message content as a tuple.
+ * @see delayed_send()
+ */
+template<class Rep, class Period, typename... Ts>
+inline void delayed_reply_tuple(const std::chrono::duration<Rep, Period>& rtime,
+ any_tuple what) {
+ get_scheduler()->delayed_reply(self->last_sender(),
+ rtime,
+ self->get_response_id(),
+ std::move(what));
+}
+
+/**
+ * @brief Sends a reply message that is delayed by @p rel_time.
+ * @param rtime Relative time duration to delay the message in
+ * microseconds, milliseconds, seconds or minutes.
+ * @param what Message elements.
+ * @see delayed_send()
+ */
+template<class Rep, class Period, typename... Ts>
+inline void delayed_reply(const std::chrono::duration<Rep, Period>& rtime,
+ Ts&&... what) {
+ delayed_reply_tuple(rtime, make_any_tuple(std::forward<Ts>(what)...));
+}
+
+/**
+ * @}
+ */
+
+} // namespace cppa
+
+#endif // CPPA_SEND_HPP
View
27 cppa/spawn_options.hpp
@@ -45,12 +45,13 @@ namespace cppa {
class spawn_options { };
#else
enum class spawn_options : int {
- no_flags = 0x00,
- link_flag = 0x01,
- monitor_flag = 0x02,
- detach_flag = 0x04,
- hide_flag = 0x08,
- blocking_api_flag = 0x10
+ no_flags = 0x00,
+ link_flag = 0x01,
+ monitor_flag = 0x02,
+ detach_flag = 0x04,
+ hide_flag = 0x08,
+ blocking_api_flag = 0x10,
+ priority_aware_flag = 0x24 // priority-aware actors are also detached
};
#endif
@@ -93,6 +94,12 @@ constexpr spawn_options hidden = spawn_options::hide_flag;
*/
constexpr spawn_options blocking_api = spawn_options::blocking_api_flag;
+/**
+ * @brief Causes the new actor to evaluate message priorities.
+ * @note This implicitly causes the actor to run in its own thread.
+ */
+constexpr spawn_options priority_aware = spawn_options::priority_aware_flag;
+
#ifndef CPPA_DOCUMENTATION
} // namespace <anonymous>
#endif
@@ -123,6 +130,14 @@ constexpr bool has_detach_flag(spawn_options opts) {
}
/**
+ * @brief Checks wheter the {@link priority_aware} flag is set in @p opts.
+ * @relates spawn_options
+ */
+constexpr bool has_priority_aware_flag(spawn_options opts) {
+ return has_spawn_option(opts, priority_aware);
+}
+
+/**
* @brief Checks wheter the {@link hidden} flag is set in @p opts.
* @relates spawn_options
*/
View
3 cppa/stacked.hpp
@@ -94,8 +94,7 @@ class stacked : public Base {
protected:
template<typename... Ts>
- stacked(std::function<void()> fun, Ts&&... args)
- : Base(std::forward<Ts>(args)...), m_behavior(std::move(fun)) { }
+ stacked(Ts&&... args) : Base(std::forward<Ts>(args)...) { }
virtual void do_become(behavior&& bhvr, bool discard_old) override {
become_impl(std::move(bhvr), discard_old, message_id());
View
18 cppa/thread_mapped_actor.hpp
@@ -58,21 +58,13 @@ namespace cppa {
class self_type;
class scheduler_helper;
-class thread_mapped_actor;
-
-template<>
-struct has_blocking_receive<thread_mapped_actor> : std::true_type { };
/**
* @brief An actor using the blocking API running in its own thread.
* @extends local_actor
*/
-class thread_mapped_actor : public extend<local_actor,thread_mapped_actor>::with<mailbox_based,stacked,threaded> {
-
- friend class self_type; // needs access to cleanup()
- friend class scheduler_helper; // needs access to mailbox
- friend class detail::receive_policy; // needs access to await_message(), etc.
- friend class detail::behavior_stack; // needs same access as receive_policy
+class thread_mapped_actor : public extend<local_actor,thread_mapped_actor>::
+ with<mailbox_based,stacked,threaded> {
typedef combined_type super;
@@ -84,11 +76,7 @@ class thread_mapped_actor : public extend<local_actor,thread_mapped_actor>::with
inline void initialized(bool value) { m_initialized = value; }
- bool initialized() const;
-
- protected:
-
- void cleanup(std::uint32_t reason);
+ virtual bool initialized() const override;
private:
View
80 cppa/threaded.hpp
@@ -56,8 +56,10 @@ class threaded : public Base {
public:
+ typedef std::chrono::high_resolution_clock::time_point timeout_type;
+
template<typename... Ts>
- threaded(Ts&&... args) : Base(std::forward<Ts>(args)...) { }
+ threaded(Ts&&... args) : Base(std::forward<Ts>(args)...), m_initialized(false) { }
inline void reset_timeout() { }