diff --git a/tests/test/util/test_queue.cpp b/tests/test/util/test_queue.cpp index 4682a5f70..b9f2ec70d 100644 --- a/tests/test/util/test_queue.cpp +++ b/tests/test/util/test_queue.cpp @@ -172,6 +172,74 @@ TEST_CASE("Test fixed capacity queue blocks if queue is full", "[util]") q.enqueue(2); // Enqueue with a short timeout so the operation fails quickly - REQUIRE_THROWS(q.enqueue(100)); + REQUIRE_THROWS_AS(q.enqueue(100), QueueTimeoutException); +} + +TEST_CASE("Test fixed capacity queue", "[util]") +{ + FixedCapIntQueue q(2); + auto latch = faabric::util::Latch::create(2); + + std::thread consumerThread([&latch, &q] { + // Make sure we consume once to make one slot in the queue + latch->wait(); + q.dequeue(); + }); + + // Fill the queue + q.enqueue(1); + q.enqueue(2); + // Trigger the consumer thread to consume once + latch->wait(); + // Check we can then enqueue a third time + q.enqueue(3); + + if (consumerThread.joinable()) { + consumerThread.join(); + } +} + +TEST_CASE("Stress test fixed capacity queue", "[util]") +{ + int numThreadPairs = 10; + int numMessages = 1000; + std::vector producerThreads; + std::vector consumerThreads; + std::vector> queues; + auto startLatch = faabric::util::Latch::create(2 * numThreadPairs + 1); + + for (int i = 0; i < numThreadPairs; i++) { + producerThreads.emplace_back([&queues, &startLatch, numMessages, i] { + startLatch->wait(); + + for (int j = 0; j < numMessages; j++) { + queues.at(i)->enqueue(i * j); + } + }); + consumerThreads.emplace_back([&queues, &startLatch, numMessages, i] { + startLatch->wait(); + + for (int j = 0; j < numMessages; j++) { + int result = queues.at(i)->dequeue(); + assert(result == i * j); + } + }); + queues.emplace_back(std::make_unique(10)); + } + + // Signal threads to start consuming and producing + startLatch->wait(); + + // Join all threads + for (auto& t : producerThreads) { + if (t.joinable()) { + t.join(); + } + } + for (auto& t : consumerThreads) { + if (t.joinable()) { + t.join(); + } + } } }