From a39b4fa195f5daf81f4049273566aeb85203b1c4 Mon Sep 17 00:00:00 2001 From: Ritesh Raj Singh Date: Fri, 3 Apr 2026 18:08:23 +0530 Subject: [PATCH] simplified implementation --- include/lockfree_spsc_bounded/defs.hpp | 4 +- include/lockfree_spsc_bounded/impl.hpp | 239 +++++++++---------------- 2 files changed, 82 insertions(+), 161 deletions(-) diff --git a/include/lockfree_spsc_bounded/defs.hpp b/include/lockfree_spsc_bounded/defs.hpp index e99be59..e864a4a 100644 --- a/include/lockfree_spsc_bounded/defs.hpp +++ b/include/lockfree_spsc_bounded/defs.hpp @@ -71,11 +71,11 @@ template class lockfree_spsc_bounded { // 4. bool try_pop(value ref) : Try to pop and return false if failed bool bool try_pop(T &); // 5. empty(void) : Checks if the queue is empty and return bool - bool empty(); + bool empty() const; // 6. bool peek(value ref) : Peek the top of the queue. bool peek(T &); template bool emplace_back(Args &&...args); - size_t size(); + size_t size() const; // Will work only in SPSC/MPSC why ?? [Reason this] // 8. Add emplace_back using perfect forwarding and variadic templates (you // can use this in push then) diff --git a/include/lockfree_spsc_bounded/impl.hpp b/include/lockfree_spsc_bounded/impl.hpp index ebc32a7..4ee2c92 100644 --- a/include/lockfree_spsc_bounded/impl.hpp +++ b/include/lockfree_spsc_bounded/impl.hpp @@ -4,189 +4,110 @@ #include "defs.hpp" #include -namespace tsfqueue::impl { -template -void lockfree_spsc_bounded::wait_and_push(T value) { - size_t next_tail = tail_cache + 1; - if (next_tail == capacity) { - next_tail = 0; - } - - static thread_local int spin_threshold = 100; - const int min_spin = 10, max_spin = 1000; - int spin = 0; - bool spun_success = false; - bool done = false; - while (true) { - if (next_tail == head_cache) { - done = true; - head_cache = head.load(std::memory_order_acquire); - if (next_tail == head_cache) { - if (spin < spin_threshold) { - // Busy-wait - } else if (spin < spin_threshold + 100) { - std::this_thread::yield(); - } else { - head.wait(head_cache, std::memory_order_acquire); - } - spin++; - continue; - } +namespace tsfqueue::impl +{ + template + void lockfree_spsc_bounded::wait_and_push(T value) + { + size_t next_tail = (tail_cache + 1) % capacity; + + while (next_tail == head_cache) + { + head_cache = head.load(std::memory_order_acquire); // busy wait } - // Refresh head_cache before was_empty calculation to ensure correctness - if (!done) { - head_cache = head.load(std::memory_order_acquire); - } - spun_success = (spin < spin_threshold); - bool was_empty = (head_cache == tail_cache); arr[tail_cache] = std::move(value); tail_cache = next_tail; tail.store(tail_cache, std::memory_order_release); - - if (was_empty) { - tail.notify_one(); - } - break; } - // Adapt spin threshold for next time - int delta = std::max(1, spin / 10); - if (spun_success) { - spin_threshold = std::min(spin_threshold + delta, max_spin); - } else { - spin_threshold = std::max(spin_threshold - delta, min_spin); + template + bool lockfree_spsc_bounded::try_push(T value) + { + return emplace_back(std::move(value)); } -} -template -bool lockfree_spsc_bounded::try_push(T value) { - return emplace_back(std::move(value)); -} - -template -bool lockfree_spsc_bounded::try_pop(T &value) { - if (tail_cache == head_cache) { - tail_cache = tail.load(std::memory_order_acquire); // refresh cache - if (tail_cache == head_cache) // empty + template + bool lockfree_spsc_bounded::try_pop(T &value) + { + if (tail_cache == head_cache) { - return false; - } - } - bool was_full = (tail_cache + 1) % capacity == head_cache; - value = arr[head_cache]; - head_cache = (head_cache + 1) % capacity; - head.store(head_cache, std::memory_order_release); - if (was_full) { - head.notify_one(); - } - return true; -} - -template -void lockfree_spsc_bounded::wait_and_pop(T &value) { - static thread_local int spin_threshold = 100; - const int min_spin = 10, max_spin = 1000; - int spin = 0; - bool spun_success = false; - bool done = false; - while (true) { - if (head_cache == tail_cache) { - done = true; tail_cache = tail.load(std::memory_order_acquire); - if (head_cache == tail_cache) { - if (spin < spin_threshold) { - // Busy-wait - } else if (spin < spin_threshold + 100) { - std::this_thread::yield(); - } else { - tail.wait(tail_cache, std::memory_order_acquire); - } - ++spin; - continue; + if (tail_cache == head_cache) + { + return false; } } - // Refresh tail_cache before was_full calculation to ensure correctness - if (!done) { - tail_cache = tail.load(std::memory_order_acquire); - } - spun_success = (spin < spin_threshold); - size_t next_tail = tail_cache + 1; - if (next_tail == capacity) { - next_tail = 0; - } - bool was_full = (next_tail == head_cache); - value = arr[head_cache]; - head_cache = head_cache + 1; - if (head_cache == capacity) { - head_cache = 0; - } + value = std::move(arr[head_cache]); + head_cache = (head_cache + 1) % capacity; head.store(head_cache, std::memory_order_release); - if (was_full) { - head.notify_one(); - } - break; + return true; } - // Adapt spin threshold for next time - int delta = std::max(1, spin / 10); - if (spun_success) { - spin_threshold = std::min(spin_threshold + delta, max_spin); - } else { - spin_threshold = std::max(spin_threshold - delta, min_spin); - } -} + template + void lockfree_spsc_bounded::wait_and_pop(T &value) + { + while (head_cache == tail_cache) + { + tail_cache = tail.load(std::memory_order_acquire); // busy wait + } -template -bool lockfree_spsc_bounded::peek(T &value) { - if (head_cache == tail_cache) { - tail_cache = tail.load(std::memory_order_acquire); - if (tail_cache == head_cache) // empty - return false; + value = std::move(arr[head_cache]); + head_cache = (head_cache + 1) % capacity; + head.store(head_cache, std::memory_order_release); } - value = arr[head_cache]; - return true; -} -template -bool lockfree_spsc_bounded::empty() { - return head.load(std::memory_order_acquire) == - tail.load(std::memory_order_acquire); -} + template + bool lockfree_spsc_bounded::peek(T &value) + { + if (head_cache == tail_cache) + { + tail_cache = tail.load(std::memory_order_acquire); + if(head_cache == tail_cache) + { + return false; + } + } + value = arr[head_cache]; + return true; + } -template -template -bool lockfree_spsc_bounded::emplace_back(Args &&...args) { - if ((tail_cache + 1) % capacity == head_cache) { - head_cache = head.load(std::memory_order_acquire); // refresh cache - if ((tail_cache + 1) % capacity == head_cache) // full + template + template + bool lockfree_spsc_bounded::emplace_back(Args &&...args) + { + if ((tail_cache + 1) % capacity == head_cache) { - return false; + head_cache = head.load(std::memory_order_acquire); + if ((tail_cache + 1) % capacity == head_cache) + { + return false; + } } + + arr[tail_cache] = T(std::forward(args)...); + tail_cache = (tail_cache + 1) % capacity; + tail.store(tail_cache, std::memory_order_release); + return true; } - bool was_empty = (head_cache == tail_cache); - arr[tail_cache] = T(std::forward(args)...); - tail_cache = (tail_cache + 1) % capacity; - tail.store(tail_cache, std::memory_order_release); - if (was_empty) { - tail.notify_one(); + + template + bool lockfree_spsc_bounded::empty() const + { + return head.load(std::memory_order_relaxed) == + tail.load(std::memory_order_relaxed); + // since queue is very frequently modified + } + + template + size_t lockfree_spsc_bounded::size() const + { + return (tail.load(std::memory_order_relaxed) - + head.load(std::memory_order_relaxed) + + capacity) % capacity; + // again, since size is very frequently changing. } - return true; -} - -template -size_t lockfree_spsc_bounded::size() { - size_t t = tail.load(std::memory_order_acquire); - size_t h = head.load(std::memory_order_acquire); - return (t - h + capacity) % capacity; -} } // namespace tsfqueue::impl -#endif - -// 1. Add static asserts -// 2. Add emplace_back using perfect forwarding and variadic templates (you -// can use this in push then) -// 3. Add size() function -// 4. Any more suggestions ?? \ No newline at end of file +#endif \ No newline at end of file