diff --git a/src/tz/core/messaging.hpp b/src/tz/core/messaging.hpp index 8a174bfb3e..4fc5c73563 100644 --- a/src/tz/core/messaging.hpp +++ b/src/tz/core/messaging.hpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace tz { @@ -23,9 +24,9 @@ namespace tz struct with_lock { protected: - std::mutex mtx = {}; + mutable std::mutex mtx = {}; using lock_t = std::unique_lock; - lock_t lock() {return lock_t{this->mtx};} + lock_t lock() const {return lock_t{this->mtx};} }; } @@ -47,13 +48,13 @@ namespace tz std::size_t message_count() const; protected: // Processes all messages and then clears the list. Thread-safe. - void process_messages(); + virtual void process_messages(); /// Invoked when a message is sent. Your override can return false to drop the message. By default, no messages are dropped. Not thread safe. virtual bool on_send_message(const M& msg) {return true;}; /// Invoked when a message is processed. You must override this. Not thread safe. /// @note If processing a message causes the receiver to send a new message to itself, this is safe, but will not be processed until the next invocation to `process_messages`. virtual void process_message(const M& msg) = 0; - private: + protected: using base_t = std::conditional_t; std::vector messages = {}; }; @@ -73,16 +74,23 @@ namespace tz /// Create a passer with no target. message_passer() = default; /// Create a passer with an existing target. - message_passer(message_receiver& target); + message_passer(message_receiver& target); + /// Create a passer with an existing target. + message_passer(message_receiver& target); + /// Redirect all messages to the target. + virtual void process_messages() override final; /// Redirect messages to the target. virtual void process_message(const M& msg) override final; /// Redirect messages to a new target. Not thread-safe. - void set_target(message_receiver& target); + void set_target(message_receiver& target); + /// Redirect messages to a new target. Not thread-safe. + void set_target(message_receiver& target); /// Clear the target. Processed messages will no longer be redirected. Not thread-safe. void clear_target(); private: - message_receiver* target = nullptr; + using base_t = message_receiver::base_t; + std::variant*, message_receiver*, std::monostate> target = std::monostate{}; }; } diff --git a/src/tz/core/messaging.inl b/src/tz/core/messaging.inl index 38cd982053..0fc3c842ef 100644 --- a/src/tz/core/messaging.inl +++ b/src/tz/core/messaging.inl @@ -60,21 +60,59 @@ namespace tz } template - message_passer::message_passer(message_receiver& parent): - parent(&parent) + message_passer::message_passer(message_receiver& target): + target(&target) {} template - void message_passer::process_message(const M& msg) + message_passer::message_passer(message_receiver& target): + target(&target) + {} + + template + void message_passer::process_messages() { - if(this->target != nullptr) + typename base_t::lock_t lock = base_t::lock(); + if(this->messages.empty()) { - this->target->send_message(msg); + return; } + std::visit([this](auto&& receiver_ptr) + { + if constexpr(!std::is_same_v, std::monostate>) + { + if(receiver_ptr != nullptr) + { + receiver_ptr->send_messages(this->messages); + } + } + }, this->target); + this->messages.clear(); + } + + template + void message_passer::process_message(const M& msg) + { + std::visit([&msg](auto&& receiver_ptr) + { + if constexpr(!std::is_same_v, std::monostate>) + { + if(receiver_ptr != nullptr) + { + receiver_ptr->send_message(msg); + } + } + }, this->target); + } + + template + void message_passer::set_target(message_receiver& target) + { + this->target = ⌖ } template - void message_passer::set_target(message_receiver& target) + void message_passer::set_target(message_receiver& target) { this->target = ⌖ } @@ -82,6 +120,6 @@ namespace tz template void message_passer::clear_target() { - this->target = nullptr; + this->target = std::monostate{}; } } diff --git a/test/core/message_test.cpp b/test/core/message_test.cpp index 872da174a9..681bd299fa 100644 --- a/test/core/message_test.cpp +++ b/test/core/message_test.cpp @@ -8,7 +8,7 @@ struct assignment_msg }; static_assert(tz::message); -class deferred_assignment : public tz::message_receiver +class deferred_assignment : public tz::message_receiver { public: int x = 0; @@ -93,8 +93,46 @@ void scenario2() tz::assert(sys.x == 95); } +// scenario 3 - thread-safe and non-thread-safe message passers to the same thread-safe message receiver. + +template +struct deferred_assignment_passer : public tz::message_passer +{ + using tz::message_passer::message_passer; + void update() + { + tz::message_passer::process_messages(); + } +}; + +using deferred_assignment_passer_ts = deferred_assignment_passer; +using deferred_assignment_passer_nts = deferred_assignment_passer; + +void scenario3() +{ + deferred_assignment sys; + + deferred_assignment_passer_ts ts(sys); + ts.set_target(sys); + + deferred_assignment_passer_nts nts(sys); + nts.set_target(sys); + + tz::assert(sys.x == 0); + ts.send_message({.new_val = 2}); + nts.send_message({.new_val = 5}); + // both of the following updates can happen concurrently on different threads, and its safe (so long as `nts` isnt also being sent new messages at the same time) + ts.update(); + nts.update(); + tz::assert(sys.message_count() == 2); + tz::assert(sys.x == 0); + sys.update(); + tz::assert(sys.x == 5); +} + int main() { scenario1(); scenario2(); + scenario3(); }