diff --git a/include/faabric/util/queue.h b/include/faabric/util/queue.h index 2ad6c25e8..0de408ebc 100644 --- a/include/faabric/util/queue.h +++ b/include/faabric/util/queue.h @@ -145,6 +145,10 @@ template class SpscQueue { public: + SpscQueue(int capacity) : mq(capacity) {}; + + SpscQueue() : mq(DEFAULT_QUEUE_SIZE) {}; + void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) { if (!mq.push(value)) { @@ -164,7 +168,6 @@ class SpscQueue writerWaiting.store(false, std::memory_order_relaxed); } - // For this to work well, I'd have to take a lock before notify_one if (readerWaiting.load(std::memory_order_relaxed)) { UniqueLock lock(mx); notEmptyNotifier.notify_one(); @@ -202,15 +205,21 @@ class SpscQueue T* peek(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) { - while (mq.read_available() <= 0) { + if (mq.read_available() == 0) { UniqueLock lock(mx); - std::cv_status returnVal = notEmptyNotifier.wait_for( - lock, std::chrono::milliseconds(timeoutMs)); + readerWaiting.store(true, std::memory_order_relaxed); - if (returnVal == std::cv_status::timeout) { - throw QueueTimeoutException("Timeout waiting for peek"); + while (mq.read_available() == 0) { + std::cv_status returnVal = notEmptyNotifier.wait_for( + lock, std::chrono::milliseconds(timeoutMs)); + + if (returnVal == std::cv_status::timeout) { + throw QueueTimeoutException("Timeout waiting for dequeue"); + } } + + readerWaiting.store(false, std::memory_order_relaxed); } return &mq.front(); @@ -219,9 +228,7 @@ class SpscQueue long size() { return mq.read_available(); } private: - boost::lockfree::spsc_queue> - mq; + boost::lockfree::spsc_queue mq; std::mutex mx; std::atomic writerWaiting = false; std::condition_variable notEmptyNotifier;