Skip to content

Commit

Permalink
add functionality and stress test for fixed capacity queues
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Dec 30, 2021
1 parent c1db191 commit 002a71e
Showing 1 changed file with 69 additions and 1 deletion.
70 changes: 69 additions & 1 deletion tests/test/util/test_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,74 @@ TEST_CASE("Test fixed capacity queue blocks if queue is full", "[util]")
q.enqueue(2);

// Enqueue with a short timeout so the operation fails quickly
REQUIRE_THROWS(q.enqueue(100));
REQUIRE_THROWS_AS(q.enqueue(100), QueueTimeoutException);
}

TEST_CASE("Test fixed capacity queue", "[util]")
{
FixedCapIntQueue q(2);
auto latch = faabric::util::Latch::create(2);

std::thread consumerThread([&latch, &q] {
// Make sure we consume once to make one slot in the queue
latch->wait();
q.dequeue();
});

// Fill the queue
q.enqueue(1);
q.enqueue(2);
// Trigger the consumer thread to consume once
latch->wait();
// Check we can then enqueue a third time
q.enqueue(3);

if (consumerThread.joinable()) {
consumerThread.join();
}
}

TEST_CASE("Stress test fixed capacity queue", "[util]")
{
int numThreadPairs = 10;
int numMessages = 1000;
std::vector<std::thread> producerThreads;
std::vector<std::thread> consumerThreads;
std::vector<std::unique_ptr<FixedCapIntQueue>> queues;
auto startLatch = faabric::util::Latch::create(2 * numThreadPairs + 1);

for (int i = 0; i < numThreadPairs; i++) {
producerThreads.emplace_back([&queues, &startLatch, numMessages, i] {
startLatch->wait();

for (int j = 0; j < numMessages; j++) {
queues.at(i)->enqueue(i * j);
}
});
consumerThreads.emplace_back([&queues, &startLatch, numMessages, i] {
startLatch->wait();

for (int j = 0; j < numMessages; j++) {
int result = queues.at(i)->dequeue();
assert(result == i * j);
}
});
queues.emplace_back(std::make_unique<FixedCapIntQueue>(10));
}

// Signal threads to start consuming and producing
startLatch->wait();

// Join all threads
for (auto& t : producerThreads) {
if (t.joinable()) {
t.join();
}
}
for (auto& t : consumerThreads) {
if (t.joinable()) {
t.join();
}
}
}
}

0 comments on commit 002a71e

Please sign in to comment.