From 5b7503851436e904bd7ffe557ac0a9df105cdd40 Mon Sep 17 00:00:00 2001 From: Alexis Brodeur Date: Tue, 30 Sep 2025 22:01:45 -0400 Subject: [PATCH 1/4] Add condition variable wrapper In line with `fine::Mutex` and `fine::SharedMutex`, this commit adds a wrapper, `fine::ConditionVariable` for `ErlNifCond` with an API similar if not identifical to `std::condition_variable`. --- README.md | 39 ++++++++++++-- c_include/fine/sync.hpp | 106 ++++++++++++++++++++++++++++++++++++++ test/c_src/finest.cpp | 46 +++++++++++++++++ test/lib/finest/nif.ex | 2 + test/test/finest_test.exs | 6 +++ 5 files changed, 195 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 55dcc85..d67fc9d 100644 --- a/README.md +++ b/README.md @@ -524,10 +524,14 @@ can be called from multiple Erlang processes simultaneously, leading to race conditions. While C++ provides synchronization mechanisms, these are unknown to Erlang and cannot take advantage of tools like *lock checker* or *lcnt*. -Fine provides analogues to `std::mutex` and `std::shared_mutex`, respectively -called `fine::Mutex` and `fine::SharedMutex`. Those are compatible with the -standard mutex wrappers, such as `std::unique_lock` and `std::shared_lock`. -For example: +Fine provides analogues to `std::mutex`, `std::shared_mutex`, and +`std::condition_variable`, respectively called `fine::Mutex`, +`fine::SharedMutex`, and `fine::ConditionVariable`. All of their implementations +are provided in the `` header. + + +`fine::Mutex` and `fine::SharedMutex` are compatible with `std::unique_lock` +and `std::shared_lock`. For example: ```c++ #include @@ -571,6 +575,33 @@ const char* my_object__name(struct my_object*); fine::SharedMutex my_object_rwlock("my_lib", "my_object", my_object__name(my_object)); ``` + +Fine also provides `fine::ConditionVariable` with an API similar to +`std::condition_variable`: + +```c++ +bool notified = false; +fine::Mutex mutex; +fine::ConditionVariable cond; + +std::thread t1([&]() { + auto lock = std::unique_lock{mutex}; + cond.wait(lock, []() { return notified; }); +}); + +std::thread t2([&]() { + { + auto lock = std::unique_lock{mutex}; + notified = true; + } + + cond.notify_all(); +}); + +t2.join(); +t1.join(); +``` + ## Allocators For compatibility with the STL, fine supports stateless allocators when diff --git a/c_include/fine/sync.hpp b/c_include/fine/sync.hpp index cd110fa..fd20bad 100644 --- a/c_include/fine/sync.hpp +++ b/c_include/fine/sync.hpp @@ -194,6 +194,112 @@ class SharedMutex final { }; std::unique_ptr m_handle; }; + +// Condition variable. Used when threads must wait for a specific +// condition to appear before continuing execution. Condition +// variables must be used with associated mutexes. +class ConditionVariable final { +public: + // Creates a condition variable. + ConditionVariable() : m_handle{enif_cond_create(nullptr)} { + if (!m_handle) { + throw std::runtime_error("failed to create cond"); + } + } + + // Creates a ConditionVariable from an ErlNifCond handle. + explicit ConditionVariable(ErlNifCond *handle) : m_handle{handle} {} + + // Creates a condition variable. + // + // `name` is a string identifying the created condition variable. It is used + // to identify the condition variable in planned future debug functionality. + explicit ConditionVariable(const char *name) + : m_handle{enif_cond_create(const_cast(name))} { + if (!m_handle) { + throw std::runtime_error("failed to create cond"); + } + } + + // Creates a condition variable. + // + // `name` is a string identifying the created condition variable. It is used + // to identify the condition variable in planned future debug functionality. + explicit ConditionVariable(const std::string &name) + : m_handle{enif_cond_create(const_cast(name.c_str()))} { + if (!m_handle) { + throw std::runtime_error("failed to create cond"); + } + } + + // Converts this ConditionVariable to a ErlNifConditionVariable handle. + // + // Ownership still belongs to this instance. + operator ErlNifCond *() const & noexcept { return m_handle.get(); } + + // Releases ownership of the ErlNifCond handle to the caller. + // + // This operation is only possible by: + // ``` + // static_cast(std::move(rwlock)) + // ``` + explicit operator ErlNifCond *() && noexcept { return m_handle.release(); } + + // Broadcasts on this condition variable. That is, if other threads are + // waiting on the condition variable being broadcast on, all of them are + // woken. + // + // This function is thread-safe. + void notify_all() noexcept { enif_cond_broadcast(m_handle.get()); } + + // Signals on a condition variable. That is, if other threads are waiting on + // the condition variable being signaled, one of them is woken. + // + // This function is thread-safe. + void notify_one() noexcept { enif_cond_signal(m_handle.get()); } + + // Waits on a condition variable. The calling thread is blocked until another + // thread wakes it by signaling or broadcasting on the condition variable. + // Before the calling thread is blocked, it unlocks the mutex passed as + // argument. When the calling thread is woken, it locks the same mutex before + // returning. That is, the mutex currently must be locked by the calling + // thread when calling this function. + // + // `wait` can return even if no one has signaled or broadcast on the condition + // variable. Code calling `wait` is always to be prepared for `wait` returning + // even if the condition that the thread was waiting for has not occurred. + // That is, when returning from `wait`, always check if the condition has + // occurred, and if not call `wait` again. + // + // This function is thread-safe. + [[deprecated( + "usage of `void fine::ConditionVariable::wait(std::unique_lock " + "&, Predicate)` is preferred")]] + void wait(std::unique_lock &lock) noexcept { + enif_cond_wait(m_handle.get(), *lock.mutex()); + } + + // Waits on a condition variable. The calling thread is blocked until another + // thread wakes it by signaling or broadcasting on the condition variable. + // Before the calling thread is blocked, it unlocks the mutex passed as + // argument. When the calling thread is woken, it locks the same mutex before + // returning. That is, the mutex currently must be locked by the calling + // thread when calling this function. + // + // This function is thread-safe. + template + void wait(std::unique_lock &lock, Predicate pred) { + while (!pred()) { + enif_cond_wait(m_handle.get(), *lock.mutex()); + } + } + +private: + struct Deleter { + void operator()(ErlNifCond *handle) noexcept { enif_cond_destroy(handle); } + }; + std::unique_ptr m_handle; +}; } // namespace fine #endif diff --git a/test/c_src/finest.cpp b/test/c_src/finest.cpp index c04ef4f..6604d06 100644 --- a/test/c_src/finest.cpp +++ b/test/c_src/finest.cpp @@ -422,6 +422,52 @@ std::nullopt_t shared_mutex_shared_lock_test(ErlNifEnv *) { } FINE_NIF(shared_mutex_shared_lock_test, 0); +std::nullopt_t condition_variable_test(ErlNifEnv *) { + bool notified = false; + fine::Mutex mutex("finest", "condition_variable_test_mutex"); + fine::ConditionVariable cond("condition_variable_test"); + + std::thread thread1([&] { + { + auto lock = std::unique_lock{mutex}; + cond.wait(lock, [&]() -> bool { return notified; }); + } + + cond.notify_all(); + }); + + std::thread thread2([&] { + { + auto lock = std::unique_lock{mutex}; + cond.wait(lock, [&]() -> bool { return notified; }); + } + + cond.notify_all(); + }); + + std::thread thread3([&] { + { + auto lock = std::unique_lock{mutex}; + cond.wait(lock, [&]() { return notified; }); + } + + cond.notify_all(); + }); + + std::thread thread4([&] { + notified = true; + cond.notify_one(); + }); + + thread1.join(); + thread2.join(); + thread3.join(); + thread4.join(); + + return std::nullopt; +} +FINE_NIF(condition_variable_test, 0); + bool compare_eq(ErlNifEnv *, fine::Term lhs, fine::Term rhs) noexcept { return lhs == rhs; } diff --git a/test/lib/finest/nif.ex b/test/lib/finest/nif.ex index 6f3426a..2374c9c 100644 --- a/test/lib/finest/nif.ex +++ b/test/lib/finest/nif.ex @@ -70,6 +70,8 @@ defmodule Finest.NIF do def shared_mutex_unique_lock_test(), do: err!() def shared_mutex_shared_lock_test(), do: err!() + def condition_variable_test(), do: err!() + def compare_eq(_lhs, _rhs), do: err!() def compare_ne(_lhs, _rhs), do: err!() def compare_lt(_lhs, _rhs), do: err!() diff --git a/test/test/finest_test.exs b/test/test/finest_test.exs index 6570c78..9df2f59 100644 --- a/test/test/finest_test.exs +++ b/test/test/finest_test.exs @@ -451,6 +451,12 @@ defmodule FinestTest do end end + describe "condition_variable" do + test "condition_variable" do + NIF.condition_variable_test() + end + end + describe "comparison" do test "equal" do refute NIF.compare_eq(64, 42) From 0351c8f26cb7bb9d8b4faad932c0d9faf1249bbe Mon Sep 17 00:00:00 2001 From: Alexis Brodeur Date: Mon, 6 Oct 2025 21:45:19 -0400 Subject: [PATCH 2/4] Remove deprecation While the function shouldn't be used, deprecation is not an apt choice to indicate that it should not be used. The deprecation notice is replaced with a prominent warning in the function's documentation. --- c_include/fine/sync.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/c_include/fine/sync.hpp b/c_include/fine/sync.hpp index fd20bad..49c24f1 100644 --- a/c_include/fine/sync.hpp +++ b/c_include/fine/sync.hpp @@ -258,6 +258,9 @@ class ConditionVariable final { // This function is thread-safe. void notify_one() noexcept { enif_cond_signal(m_handle.get()); } + // Prefer the use of `wait(std::unique_lock&, Predicate)` over this + // function. + // // Waits on a condition variable. The calling thread is blocked until another // thread wakes it by signaling or broadcasting on the condition variable. // Before the calling thread is blocked, it unlocks the mutex passed as @@ -272,9 +275,6 @@ class ConditionVariable final { // occurred, and if not call `wait` again. // // This function is thread-safe. - [[deprecated( - "usage of `void fine::ConditionVariable::wait(std::unique_lock " - "&, Predicate)` is preferred")]] void wait(std::unique_lock &lock) noexcept { enif_cond_wait(m_handle.get(), *lock.mutex()); } From 124ea088148df5b226521e17ac6d5c99114f2448 Mon Sep 17 00:00:00 2001 From: Alexis Brodeur Date: Mon, 6 Oct 2025 22:42:21 -0400 Subject: [PATCH 3/4] Rework condition_variable test --- test/c_src/finest.cpp | 74 +++++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/test/c_src/finest.cpp b/test/c_src/finest.cpp index 6604d06..5abe119 100644 --- a/test/c_src/finest.cpp +++ b/test/c_src/finest.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -422,47 +423,60 @@ std::nullopt_t shared_mutex_shared_lock_test(ErlNifEnv *) { } FINE_NIF(shared_mutex_shared_lock_test, 0); -std::nullopt_t condition_variable_test(ErlNifEnv *) { - bool notified = false; - fine::Mutex mutex("finest", "condition_variable_test_mutex"); - fine::ConditionVariable cond("condition_variable_test"); +class ResetEvent { +public: +public: + explicit ResetEvent(const bool signaled = false) noexcept + : m_signaled{signaled} {} - std::thread thread1([&] { + void set() { { - auto lock = std::unique_lock{mutex}; - cond.wait(lock, [&]() -> bool { return notified; }); + auto lock = std::unique_lock{m_mutex}; + m_signaled = true; } - cond.notify_all(); - }); + m_cond.notify_one(); + } - std::thread thread2([&] { - { - auto lock = std::unique_lock{mutex}; - cond.wait(lock, [&]() -> bool { return notified; }); - } + void reset() { + auto lock = std::unique_lock{m_mutex}; + m_signaled = false; + } - cond.notify_all(); - }); + void wait() { + auto lock = std::unique_lock{m_mutex}; + m_cond.wait(lock, [&] { return m_signaled; }); + m_signaled = false; + } - std::thread thread3([&] { - { - auto lock = std::unique_lock{mutex}; - cond.wait(lock, [&]() { return notified; }); - } +private: + bool m_signaled; + fine::Mutex m_mutex; + fine::ConditionVariable m_cond; +}; - cond.notify_all(); - }); +std::nullopt_t condition_variable_test(ErlNifEnv *) { + ResetEvent event{true}; + event.reset(); - std::thread thread4([&] { - notified = true; - cond.notify_one(); + std::thread wait_thread_1([&] { + event.wait(); + event.set(); + }); + std::thread wait_thread_2([&] { + event.wait(); + event.set(); }); + std::thread wait_thread_3([&] { + event.wait(); + event.set(); + }); + + event.set(); - thread1.join(); - thread2.join(); - thread3.join(); - thread4.join(); + wait_thread_1.join(); + wait_thread_2.join(); + wait_thread_3.join(); return std::nullopt; } From 82af5e526847c69d360509b6b994a0dafdca3740 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Tue, 7 Oct 2025 12:14:42 +0200 Subject: [PATCH 4/4] Update test/c_src/finest.cpp --- test/c_src/finest.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/test/c_src/finest.cpp b/test/c_src/finest.cpp index 5abe119..187491a 100644 --- a/test/c_src/finest.cpp +++ b/test/c_src/finest.cpp @@ -424,7 +424,6 @@ std::nullopt_t shared_mutex_shared_lock_test(ErlNifEnv *) { FINE_NIF(shared_mutex_shared_lock_test, 0); class ResetEvent { -public: public: explicit ResetEvent(const bool signaled = false) noexcept : m_signaled{signaled} {}