Skip to content

Commit

Permalink
add constructor for queue with capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Dec 24, 2021
1 parent e65e970 commit ce2346a
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ template<typename T>
class SpscQueue
{
public:
SpscQueue(int capacity) : mq(capacity) {};

SpscQueue() : mq(DEFAULT_QUEUE_SIZE) {};

void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
if (!mq.push(value)) {
Expand All @@ -164,7 +168,6 @@ class SpscQueue
writerWaiting.store(false, std::memory_order_relaxed);
}

// For this to work well, I'd have to take a lock before notify_one
if (readerWaiting.load(std::memory_order_relaxed)) {
UniqueLock lock(mx);
notEmptyNotifier.notify_one();
Expand Down Expand Up @@ -202,15 +205,21 @@ class SpscQueue

T* peek(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
while (mq.read_available() <= 0) {
if (mq.read_available() == 0) {
UniqueLock lock(mx);

std::cv_status returnVal = notEmptyNotifier.wait_for(
lock, std::chrono::milliseconds(timeoutMs));
readerWaiting.store(true, std::memory_order_relaxed);

if (returnVal == std::cv_status::timeout) {
throw QueueTimeoutException("Timeout waiting for peek");
while (mq.read_available() == 0) {
std::cv_status returnVal = notEmptyNotifier.wait_for(
lock, std::chrono::milliseconds(timeoutMs));

if (returnVal == std::cv_status::timeout) {
throw QueueTimeoutException("Timeout waiting for dequeue");
}
}

readerWaiting.store(false, std::memory_order_relaxed);
}

return &mq.front();
Expand All @@ -219,9 +228,7 @@ class SpscQueue
long size() { return mq.read_available(); }

private:
boost::lockfree::spsc_queue<T,
boost::lockfree::capacity<DEFAULT_QUEUE_SIZE>>
mq;
boost::lockfree::spsc_queue<T> mq;
std::mutex mx;
std::atomic<bool> writerWaiting = false;
std::condition_variable notEmptyNotifier;
Expand Down

0 comments on commit ce2346a

Please sign in to comment.