Skip to content

Commit

Permalink
Introduce Fixed-Capacity Queues (#205)
Browse files Browse the repository at this point in the history
* implement fixed size, single consumer, single producer, queue

* mpi reduce test failing

* fix race condition in spsc queues

* add constructor for queue with capacity

* attempts at fixing race condition in p2p kernel

* use circular buffer lock-full instead of lock-free queue

* add test for enqueuing to a full queue

* add functionality and stress test for fixed capacity queues

* switch underlying queue to moodycamel's, and invalidate probe

* remove queue prof script

* cleanup

* temporarily add moodycamel's queue to tsan ignorelist

* pr comments
  • Loading branch information
csegarragonz committed Dec 31, 2021
1 parent 0e7a32c commit d4599a0
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 10 deletions.
3 changes: 3 additions & 0 deletions cmake/ExternalProjects.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ conan_cmake_configure(
hiredis/1.0.2@#297f55bf1e66f8b9c1dc0e7d35e705ab
protobuf/3.17.1@#12f6551f4a57bbd3bf38ff3aad6aaa7e
rapidjson/cci.20200410@#abe3eeacf36801901f6f6d82d124781a
readerwriterqueue/1.0.5@#4232c2ff826eb41e33d8ad8efd3c4c4c
spdlog/1.9.2@#3724602b7b7e843c5e0a687c45e279c9
zeromq/4.3.4@#3b9b0de9c4509784dc92629f3aaf2fe4
GENERATORS
Expand Down Expand Up @@ -89,6 +90,7 @@ find_package(cppzmq REQUIRED)
find_package(fmt REQUIRED)
find_package(hiredis REQUIRED)
find_package(spdlog REQUIRED)
find_package(readerwriterqueue REQUIRED)

# Pistache - Conan version is out of date and doesn't support clang
FetchContent_Declare(pistache_ext
Expand Down Expand Up @@ -142,6 +144,7 @@ target_link_libraries(faabric_common_dependencies INTERFACE
pistache::pistache
protobuf::libprotobuf
RapidJSON::RapidJSON
readerwriterqueue::readerwriterqueue
spdlog::spdlog
Threads::Threads
zstd::libzstd_static
Expand Down
4 changes: 2 additions & 2 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ std::vector<faabric::MpiHostsToRanksMessage> getMpiHostsToRanksMessages();
std::vector<std::shared_ptr<faabric::MPIMessage>> getMpiMockedMessages(
int sendRank);

typedef faabric::util::Queue<std::shared_ptr<faabric::MPIMessage>>
typedef faabric::util::FixedCapacityQueue<std::shared_ptr<faabric::MPIMessage>>
InMemoryMpiQueue;

class MpiWorld
Expand Down Expand Up @@ -276,7 +276,7 @@ class MpiWorld
void checkRanksRange(int sendRank, int recvRank);

// Abstraction of the bulk of the recv work, shared among various functions
void doRecv(std::shared_ptr<faabric::MPIMessage> m,
void doRecv(std::shared_ptr<faabric::MPIMessage>& m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
Expand Down
77 changes: 77 additions & 0 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

#include <condition_variable>
#include <queue>
#include <readerwriterqueue/readerwritercircularbuffer.h>

#define DEFAULT_QUEUE_TIMEOUT_MS 5000
#define DEFAULT_QUEUE_SIZE 1024

namespace faabric::util {
class QueueTimeoutException : public faabric::util::FaabricException
Expand Down Expand Up @@ -138,6 +140,81 @@ class Queue
std::mutex mx;
};

// Wrapper around moodycamel's blocking fixed capacity single producer single
// consumer queue
// https://github.com/cameron314/readerwriterqueue
template<typename T>
class FixedCapacityQueue
{
public:
FixedCapacityQueue(int capacity)
: mq(capacity){};

FixedCapacityQueue()
: mq(DEFAULT_QUEUE_SIZE){};

void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
throw std::runtime_error("Invalid queue timeout");
}

bool success =
mq.wait_enqueue_timed(std::move(value), timeoutMs * 1000);
if (!success) {
throw QueueTimeoutException("Timeout waiting for enqueue");
}
}

void dequeueIfPresent(T* res) { mq.try_dequeue(*res); }

T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
throw std::runtime_error("Invalid queue timeout");
}

T value;
bool success = mq.wait_dequeue_timed(value, timeoutMs * 1000);
if (!success) {
throw QueueTimeoutException("Timeout waiting for dequeue");
}

return value;
}

T* peek(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
throw std::runtime_error("Peek not implemented");
}

void drain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
T value;
bool success;
while (size() > 0) {
success = mq.wait_dequeue_timed(value, timeoutMs * 1000);
if (!success) {
throw QueueTimeoutException("Timeout waiting to drain");
}
}
}

long size() { return mq.size_approx(); }

void reset()
{
moodycamel::BlockingReaderWriterCircularBuffer<T> empty(
mq.max_capacity());
std::swap(mq, empty);
}

private:
moodycamel::BlockingReaderWriterCircularBuffer<T> mq;
};

class TokenPool
{
public:
Expand Down
8 changes: 7 additions & 1 deletion src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ void MpiWorld::recv(int sendRank,
doRecv(m, buffer, dataType, count, status, messageType);
}

void MpiWorld::doRecv(std::shared_ptr<faabric::MPIMessage> m,
void MpiWorld::doRecv(std::shared_ptr<faabric::MPIMessage>& m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
Expand Down Expand Up @@ -1494,10 +1494,16 @@ void MpiWorld::allToAll(int rank,
}
}

// 30/12/21 - Probe is now broken after the switch to a different type of
// queues for local messaging. New queues don't support (off-the-shelf) the
// ability to return a reference to the first element in the queue. In order
// to re-include support for probe we must fix the peek method in the
// queues.
void MpiWorld::probe(int sendRank, int recvRank, MPI_Status* status)
{
const std::shared_ptr<InMemoryMpiQueue>& queue =
getLocalQueue(sendRank, recvRank);
// 30/12/21 - Peek will throw a runtime error
std::shared_ptr<faabric::MPIMessage> m = *(queue->peek());

faabric_datatype_t* datatype = getFaabricDatatypeFromId(m->type());
Expand Down
4 changes: 3 additions & 1 deletion tests/test/scheduler/test_mpi_world.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,9 @@ TEST_CASE_METHOD(MpiTestFixture, "Test recv with partial data", "[mpi]")
REQUIRE(status.bytesSize == actualSize * sizeof(int));
}

TEST_CASE_METHOD(MpiTestFixture, "Test probe", "[mpi]")
// 30/12/21 - MPI_Probe is broken after the switch to single-producer, single-
// consumer fixed capacity queues.
TEST_CASE_METHOD(MpiTestFixture, "Test probe", "[.]")
{
// Send two messages of different sizes
std::vector<int> messageData = { 0, 1, 2, 3, 4, 5, 6 };
Expand Down
144 changes: 138 additions & 6 deletions tests/test/util/test_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
using namespace faabric::util;

typedef faabric::util::Queue<int> IntQueue;
typedef faabric::util::FixedCapacityQueue<int> FixedCapIntQueue;
typedef faabric::util::Queue<std::promise<int32_t>> PromiseQueue;
typedef faabric::util::FixedCapacityQueue<std::promise<int32_t>>
FixedCapPromiseQueue;

namespace tests {
TEST_CASE("Test queue operations", "[util]")
Expand Down Expand Up @@ -52,9 +56,9 @@ TEST_CASE("Test queue operations", "[util]")
REQUIRE_THROWS(q.dequeue(1));
}

TEST_CASE("Test drain queue", "[util]")
TEMPLATE_TEST_CASE("Test drain queue", "[util]", IntQueue, FixedCapIntQueue)
{
IntQueue q;
TestType q;

q.enqueue(1);
q.enqueue(2);
Expand Down Expand Up @@ -105,9 +109,12 @@ TEST_CASE("Test wait for draining queue with elements", "[util]")
REQUIRE(dequeued == expected);
}

TEST_CASE("Test queue on non-copy-constructible object", "[util]")
TEMPLATE_TEST_CASE("Test queue on non-copy-constructible object",
"[util]",
PromiseQueue,
FixedCapPromiseQueue)
{
faabric::util::Queue<std::promise<int32_t>> q;
TestType q;

std::promise<int32_t> a;
std::promise<int32_t> b;
Expand Down Expand Up @@ -135,16 +142,141 @@ TEST_CASE("Test queue on non-copy-constructible object", "[util]")
REQUIRE(fb.get() == 2);
}

TEST_CASE("Test queue timeout must be positive", "[util]")
TEMPLATE_TEST_CASE("Test queue timeout must be positive",
"[util]",
IntQueue,
FixedCapIntQueue)
{
int timeoutValueMs;

SECTION("Zero timeout") { timeoutValueMs = 0; }

SECTION("Negative timeout") { timeoutValueMs = -1; }

faabric::util::Queue<int> q;
TestType q;
q.enqueue(10);
REQUIRE_THROWS(q.dequeue(timeoutValueMs));
}

TEST_CASE("Test fixed capacity queue blocks if queue is full", "[util]")
{
FixedCapIntQueue q(2);

q.enqueue(1);
q.enqueue(2);

// Enqueue with a short timeout so the operation fails quickly
REQUIRE_THROWS_AS(q.enqueue(100), QueueTimeoutException);
}

TEST_CASE("Test fixed capacity queue", "[util]")
{
FixedCapIntQueue q(2);
auto latch = faabric::util::Latch::create(2);

std::thread consumerThread([&latch, &q] {
// Make sure we consume once to make one slot in the queue
latch->wait();
q.dequeue();
});

// Fill the queue
q.enqueue(1);
q.enqueue(2);
// Trigger the consumer thread to consume once
latch->wait();
// Check we can then enqueue a third time
q.enqueue(3);

if (consumerThread.joinable()) {
consumerThread.join();
}
}

TEST_CASE("Stress test fixed capacity queue", "[util]")
{
int numThreadPairs = 10;
int numMessages = 1000;
std::vector<std::thread> producerThreads;
std::vector<std::thread> consumerThreads;
std::vector<std::unique_ptr<FixedCapIntQueue>> queues;
auto startLatch = faabric::util::Latch::create(2 * numThreadPairs + 1);

for (int i = 0; i < numThreadPairs; i++) {
producerThreads.emplace_back([&queues, &startLatch, numMessages, i] {
startLatch->wait();

for (int j = 0; j < numMessages; j++) {
queues.at(i)->enqueue(i * j);
}
});
consumerThreads.emplace_back([&queues, &startLatch, numMessages, i] {
startLatch->wait();

for (int j = 0; j < numMessages; j++) {
int result = queues.at(i)->dequeue();
assert(result == i * j);
}
});
queues.emplace_back(std::make_unique<FixedCapIntQueue>(10));
}

// Signal threads to start consuming and producing
startLatch->wait();

// Join all threads
for (auto& t : producerThreads) {
if (t.joinable()) {
t.join();
}
}
for (auto& t : consumerThreads) {
if (t.joinable()) {
t.join();
}
}
}

TEST_CASE("Test fixed capacity queue with asymetric consume/produce rates",
"[util]")
{
FixedCapIntQueue q(2);
int nMessages = 100;

// Fast producer
bool producerSuccess = false;
std::thread producerThread([&q, nMessages, &producerSuccess] {
for (int i = 0; i < nMessages; i++) {
SLEEP_MS(1);
q.enqueue(i);
}

producerSuccess = true;
});

// Slow consumer
bool consumerSuccess = false;
std::thread consumerThread([&q, nMessages, &consumerSuccess] {
for (int i = 0; i < nMessages; i++) {
SLEEP_MS(50);
int res = q.dequeue();
if (res != i) {
return;
}
}

consumerSuccess = true;
});

if (producerThread.joinable()) {
producerThread.join();
}

if (consumerThread.joinable()) {
consumerThread.join();
}

REQUIRE(producerSuccess);
REQUIRE(consumerSuccess);
}
}
2 changes: 2 additions & 0 deletions thread-sanitizer-ignorelist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ race:zmq::*
race:faabric::util::SystemConfig::*
# Catch2 allocates in its signal handler, this prevents showing the wrong crash report
signal:*
# TODO: moodycamel's queue version 1.0.6 fixes the warnings we silence here
race:moodycamel::*

# TODO: Remove: There's something weird going on with MPI code I don't understand
race:faabric::scheduler::MpiWorld::*

0 comments on commit d4599a0

Please sign in to comment.