Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/lockfree_spsc_bounded/defs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ template <typename T, size_t Capacity> class lockfree_spsc_bounded {
// 4. bool try_pop(value ref) : Try to pop and return false if failed bool
bool try_pop(T &);
// 5. empty(void) : Checks if the queue is empty and return bool
bool empty();
bool empty() const;
// 6. bool peek(value ref) : Peek the top of the queue.
bool peek(T &);
template <typename... Args> bool emplace_back(Args &&...args);
size_t size();
size_t size() const;
// Will work only in SPSC/MPSC why ?? [Reason this]
// 8. Add emplace_back using perfect forwarding and variadic templates (you
// can use this in push then)
Expand Down
239 changes: 80 additions & 159 deletions include/lockfree_spsc_bounded/impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,189 +4,110 @@
#include "defs.hpp"
#include <utility>

namespace tsfqueue::impl {
template <typename T, size_t Capacity>
void lockfree_spsc_bounded<T, Capacity>::wait_and_push(T value) {
size_t next_tail = tail_cache + 1;
if (next_tail == capacity) {
next_tail = 0;
}

static thread_local int spin_threshold = 100;
const int min_spin = 10, max_spin = 1000;
int spin = 0;
bool spun_success = false;
bool done = false;
while (true) {
if (next_tail == head_cache) {
done = true;
head_cache = head.load(std::memory_order_acquire);
if (next_tail == head_cache) {
if (spin < spin_threshold) {
// Busy-wait
} else if (spin < spin_threshold + 100) {
std::this_thread::yield();
} else {
head.wait(head_cache, std::memory_order_acquire);
}
spin++;
continue;
}
namespace tsfqueue::impl
{
template <typename T, size_t Capacity>
void lockfree_spsc_bounded<T, Capacity>::wait_and_push(T value)
{
size_t next_tail = (tail_cache + 1) % capacity;

while (next_tail == head_cache)
{
head_cache = head.load(std::memory_order_acquire); // busy wait
}

// Refresh head_cache before was_empty calculation to ensure correctness
if (!done) {
head_cache = head.load(std::memory_order_acquire);
}
spun_success = (spin < spin_threshold);
bool was_empty = (head_cache == tail_cache);
arr[tail_cache] = std::move(value);
tail_cache = next_tail;
tail.store(tail_cache, std::memory_order_release);

if (was_empty) {
tail.notify_one();
}
break;
}

// Adapt spin threshold for next time
int delta = std::max(1, spin / 10);
if (spun_success) {
spin_threshold = std::min(spin_threshold + delta, max_spin);
} else {
spin_threshold = std::max(spin_threshold - delta, min_spin);
template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::try_push(T value)
{
return emplace_back(std::move(value));
}
}

template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::try_push(T value) {
return emplace_back(std::move(value));
}

template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::try_pop(T &value) {
if (tail_cache == head_cache) {
tail_cache = tail.load(std::memory_order_acquire); // refresh cache
if (tail_cache == head_cache) // empty
template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::try_pop(T &value)
{
if (tail_cache == head_cache)
{
return false;
}
}
bool was_full = (tail_cache + 1) % capacity == head_cache;
value = arr[head_cache];
head_cache = (head_cache + 1) % capacity;
head.store(head_cache, std::memory_order_release);
if (was_full) {
head.notify_one();
}
return true;
}

template <typename T, size_t Capacity>
void lockfree_spsc_bounded<T, Capacity>::wait_and_pop(T &value) {
static thread_local int spin_threshold = 100;
const int min_spin = 10, max_spin = 1000;
int spin = 0;
bool spun_success = false;
bool done = false;
while (true) {
if (head_cache == tail_cache) {
done = true;
tail_cache = tail.load(std::memory_order_acquire);
if (head_cache == tail_cache) {
if (spin < spin_threshold) {
// Busy-wait
} else if (spin < spin_threshold + 100) {
std::this_thread::yield();
} else {
tail.wait(tail_cache, std::memory_order_acquire);
}
++spin;
continue;
if (tail_cache == head_cache)
{
return false;
}
}

// Refresh tail_cache before was_full calculation to ensure correctness
if (!done) {
tail_cache = tail.load(std::memory_order_acquire);
}
spun_success = (spin < spin_threshold);
size_t next_tail = tail_cache + 1;
if (next_tail == capacity) {
next_tail = 0;
}
bool was_full = (next_tail == head_cache);
value = arr[head_cache];
head_cache = head_cache + 1;
if (head_cache == capacity) {
head_cache = 0;
}
value = std::move(arr[head_cache]);
head_cache = (head_cache + 1) % capacity;
head.store(head_cache, std::memory_order_release);
if (was_full) {
head.notify_one();
}
break;
return true;
}

// Adapt spin threshold for next time
int delta = std::max(1, spin / 10);
if (spun_success) {
spin_threshold = std::min(spin_threshold + delta, max_spin);
} else {
spin_threshold = std::max(spin_threshold - delta, min_spin);
}
}
template <typename T, size_t Capacity>
void lockfree_spsc_bounded<T, Capacity>::wait_and_pop(T &value)
{
while (head_cache == tail_cache)
{
tail_cache = tail.load(std::memory_order_acquire); // busy wait
}

template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::peek(T &value) {
if (head_cache == tail_cache) {
tail_cache = tail.load(std::memory_order_acquire);
if (tail_cache == head_cache) // empty
return false;
value = std::move(arr[head_cache]);
head_cache = (head_cache + 1) % capacity;
head.store(head_cache, std::memory_order_release);
}
value = arr[head_cache];
return true;
}

template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::empty() {
return head.load(std::memory_order_acquire) ==
tail.load(std::memory_order_acquire);
}
template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::peek(T &value)
{
if (head_cache == tail_cache)
{
tail_cache = tail.load(std::memory_order_acquire);
if(head_cache == tail_cache)
{
return false;
}
}
value = arr[head_cache];
return true;
}

template <typename T, size_t Capacity>
template <typename... Args>
bool lockfree_spsc_bounded<T, Capacity>::emplace_back(Args &&...args) {
if ((tail_cache + 1) % capacity == head_cache) {
head_cache = head.load(std::memory_order_acquire); // refresh cache
if ((tail_cache + 1) % capacity == head_cache) // full
template <typename T, size_t Capacity>
template <typename... Args>
bool lockfree_spsc_bounded<T, Capacity>::emplace_back(Args &&...args)
{
if ((tail_cache + 1) % capacity == head_cache)
{
return false;
head_cache = head.load(std::memory_order_acquire);
if ((tail_cache + 1) % capacity == head_cache)
{
return false;
}
}

arr[tail_cache] = T(std::forward<Args>(args)...);
tail_cache = (tail_cache + 1) % capacity;
tail.store(tail_cache, std::memory_order_release);
return true;
}
bool was_empty = (head_cache == tail_cache);
arr[tail_cache] = T(std::forward<Args>(args)...);
tail_cache = (tail_cache + 1) % capacity;
tail.store(tail_cache, std::memory_order_release);
if (was_empty) {
tail.notify_one();

template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::empty() const
{
return head.load(std::memory_order_relaxed) ==
tail.load(std::memory_order_relaxed);
// since queue is very frequently modified
}

template <typename T, size_t Capacity>
size_t lockfree_spsc_bounded<T, Capacity>::size() const
{
return (tail.load(std::memory_order_relaxed) -
head.load(std::memory_order_relaxed) +
capacity) % capacity;
// again, since size is very frequently changing.
}
return true;
}

template <typename T, size_t Capacity>
size_t lockfree_spsc_bounded<T, Capacity>::size() {
size_t t = tail.load(std::memory_order_acquire);
size_t h = head.load(std::memory_order_acquire);
return (t - h + capacity) % capacity;
}
} // namespace tsfqueue::impl

#endif

// 1. Add static asserts
// 2. Add emplace_back using perfect forwarding and variadic templates (you
// can use this in push then)
// 3. Add size() function
// 4. Any more suggestions ??
#endif
Loading