Skip to content

Commit

Permalink
[tz.core.messaging] message_passer can now target both thread-safe an…
Browse files Browse the repository at this point in the history
…d non-thread-safe receivers, whereas previously it could only target a receiver with thread-safety matching the thread-safety of the passer. in addition, process_messages of the passer is now overriden to prevent unnecessary iteration of locks. now a single lock is used with an invocation to send_messages instead of a lock-per-iteration into send_message to the target receiver
  • Loading branch information
harrand committed Jan 23, 2024
1 parent 03292e2 commit 0344c2d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 15 deletions.
22 changes: 15 additions & 7 deletions src/tz/core/messaging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <vector>
#include <mutex>
#include <ranges>
#include <variant>

namespace tz
{
Expand All @@ -23,9 +24,9 @@ namespace tz
struct with_lock
{
protected:
std::mutex mtx = {};
mutable std::mutex mtx = {};
using lock_t = std::unique_lock<std::mutex>;
lock_t lock() {return lock_t{this->mtx};}
lock_t lock() const {return lock_t{this->mtx};}
};
}

Expand All @@ -47,13 +48,13 @@ namespace tz
std::size_t message_count() const;
protected:
// Processes all messages and then clears the list. Thread-safe.
void process_messages();
virtual void process_messages();
/// Invoked when a message is sent. Your override can return false to drop the message. By default, no messages are dropped. Not thread safe.
virtual bool on_send_message(const M& msg) {return true;};
/// Invoked when a message is processed. You must override this. Not thread safe.
/// @note If processing a message causes the receiver to send a new message to itself, this is safe, but will not be processed until the next invocation to `process_messages`.
virtual void process_message(const M& msg) = 0;
private:
protected:
using base_t = std::conditional_t<thread_safe, detail::with_lock, detail::no_lock>;
std::vector<M> messages = {};
};
Expand All @@ -73,16 +74,23 @@ namespace tz
/// Create a passer with no target.
message_passer() = default;
/// Create a passer with an existing target.
message_passer(message_receiver<M, thread_safe>& target);
message_passer(message_receiver<M, false>& target);
/// Create a passer with an existing target.
message_passer(message_receiver<M, true>& target);
/// Redirect all messages to the target.
virtual void process_messages() override final;
/// Redirect messages to the target.
virtual void process_message(const M& msg) override final;

/// Redirect messages to a new target. Not thread-safe.
void set_target(message_receiver<M, thread_safe>& target);
void set_target(message_receiver<M, true>& target);
/// Redirect messages to a new target. Not thread-safe.
void set_target(message_receiver<M, false>& target);
/// Clear the target. Processed messages will no longer be redirected. Not thread-safe.
void clear_target();
private:
message_receiver<M, thread_safe>* target = nullptr;
using base_t = message_receiver<M, thread_safe>::base_t;
std::variant<message_receiver<M, true>*, message_receiver<M, false>*, std::monostate> target = std::monostate{};
};
}

Expand Down
52 changes: 45 additions & 7 deletions src/tz/core/messaging.inl
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,66 @@ namespace tz
}

template<tz::message M, bool thread_safe>
message_passer<M, thread_safe>::message_passer(message_receiver<M, thread_safe>& parent):
parent(&parent)
message_passer<M, thread_safe>::message_passer(message_receiver<M, true>& target):
target(&target)
{}

template<tz::message M, bool thread_safe>
void message_passer<M, thread_safe>::process_message(const M& msg)
message_passer<M, thread_safe>::message_passer(message_receiver<M, false>& target):
target(&target)
{}

template<tz::message M, bool thread_safe>
void message_passer<M, thread_safe>::process_messages()
{
if(this->target != nullptr)
typename base_t::lock_t lock = base_t::lock();
if(this->messages.empty())
{
this->target->send_message(msg);
return;
}
std::visit([this](auto&& receiver_ptr)
{
if constexpr(!std::is_same_v<std::decay_t<decltype(receiver_ptr)>, std::monostate>)
{
if(receiver_ptr != nullptr)
{
receiver_ptr->send_messages(this->messages);
}
}
}, this->target);
this->messages.clear();
}

template<tz::message M, bool thread_safe>
void message_passer<M, thread_safe>::process_message(const M& msg)
{
std::visit([&msg](auto&& receiver_ptr)
{
if constexpr(!std::is_same_v<std::decay_t<decltype(receiver_ptr)>, std::monostate>)
{
if(receiver_ptr != nullptr)
{
receiver_ptr->send_message(msg);
}
}
}, this->target);
}

template<tz::message M, bool thread_safe>
void message_passer<M, thread_safe>::set_target(message_receiver<M, true>& target)
{
this->target = &target;
}

template<tz::message M, bool thread_safe>
void message_passer<M, thread_safe>::set_target(message_receiver<M, thread_safe>& target)
void message_passer<M, thread_safe>::set_target(message_receiver<M, false>& target)
{
this->target = &target;
}

template<tz::message M, bool thread_safe>
void message_passer<M, thread_safe>::clear_target()
{
this->target = nullptr;
this->target = std::monostate{};
}
}
40 changes: 39 additions & 1 deletion test/core/message_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ struct assignment_msg
};
static_assert(tz::message<assignment_msg>);

class deferred_assignment : public tz::message_receiver<assignment_msg>
class deferred_assignment : public tz::message_receiver<assignment_msg, true>
{
public:
int x = 0;
Expand Down Expand Up @@ -93,8 +93,46 @@ void scenario2()
tz::assert(sys.x == 95);
}

// scenario 3 - thread-safe and non-thread-safe message passers to the same thread-safe message receiver.

template<bool thread_safe>
struct deferred_assignment_passer : public tz::message_passer<assignment_msg, thread_safe>
{
using tz::message_passer<assignment_msg, thread_safe>::message_passer;
void update()
{
tz::message_passer<assignment_msg, thread_safe>::process_messages();
}
};

using deferred_assignment_passer_ts = deferred_assignment_passer<true>;
using deferred_assignment_passer_nts = deferred_assignment_passer<false>;

void scenario3()
{
deferred_assignment sys;

deferred_assignment_passer_ts ts(sys);
ts.set_target(sys);

deferred_assignment_passer_nts nts(sys);
nts.set_target(sys);

tz::assert(sys.x == 0);
ts.send_message({.new_val = 2});
nts.send_message({.new_val = 5});
// both of the following updates can happen concurrently on different threads, and its safe (so long as `nts` isnt also being sent new messages at the same time)
ts.update();
nts.update();
tz::assert(sys.message_count() == 2);
tz::assert(sys.x == 0);
sys.update();
tz::assert(sys.x == 5);
}

int main()
{
scenario1();
scenario2();
scenario3();
}

0 comments on commit 0344c2d

Please sign in to comment.