Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Fixed-Capacity Queues #205

Merged
merged 13 commits into from
Dec 31, 2021
Merged
3 changes: 3 additions & 0 deletions cmake/ExternalProjects.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ conan_cmake_configure(
hiredis/1.0.2@#297f55bf1e66f8b9c1dc0e7d35e705ab
protobuf/3.17.1@#12f6551f4a57bbd3bf38ff3aad6aaa7e
rapidjson/cci.20200410@#abe3eeacf36801901f6f6d82d124781a
readerwriterqueue/1.0.5@#4232c2ff826eb41e33d8ad8efd3c4c4c
spdlog/1.9.2@#3724602b7b7e843c5e0a687c45e279c9
zeromq/4.3.4@#3b9b0de9c4509784dc92629f3aaf2fe4
GENERATORS
Expand Down Expand Up @@ -89,6 +90,7 @@ find_package(cppzmq REQUIRED)
find_package(fmt REQUIRED)
find_package(hiredis REQUIRED)
find_package(spdlog REQUIRED)
find_package(readerwriterqueue REQUIRED)

# Pistache - Conan version is out of date and doesn't support clang
FetchContent_Declare(pistache_ext
Expand Down Expand Up @@ -142,6 +144,7 @@ target_link_libraries(faabric_common_dependencies INTERFACE
pistache::pistache
protobuf::libprotobuf
RapidJSON::RapidJSON
readerwriterqueue::readerwriterqueue
spdlog::spdlog
Threads::Threads
zstd::libzstd_static
Expand Down
4 changes: 2 additions & 2 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ std::vector<faabric::MpiHostsToRanksMessage> getMpiHostsToRanksMessages();
std::vector<std::shared_ptr<faabric::MPIMessage>> getMpiMockedMessages(
int sendRank);

typedef faabric::util::Queue<std::shared_ptr<faabric::MPIMessage>>
typedef faabric::util::FixedCapacityQueue<std::shared_ptr<faabric::MPIMessage>>
InMemoryMpiQueue;

class MpiWorld
Expand Down Expand Up @@ -276,7 +276,7 @@ class MpiWorld
void checkRanksRange(int sendRank, int recvRank);

// Abstraction of the bulk of the recv work, shared among various functions
void doRecv(std::shared_ptr<faabric::MPIMessage> m,
void doRecv(std::shared_ptr<faabric::MPIMessage>& m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
Expand Down
77 changes: 77 additions & 0 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

#include <condition_variable>
#include <queue>
#include <readerwriterqueue/readerwritercircularbuffer.h>

#define DEFAULT_QUEUE_TIMEOUT_MS 5000
#define DEFAULT_QUEUE_SIZE 1024

namespace faabric::util {
class QueueTimeoutException : public faabric::util::FaabricException
Expand Down Expand Up @@ -138,6 +140,81 @@ class Queue
std::mutex mx;
};

// Wrapper around moodycamel's blocking fixed capacity single producer single
// consumer queue
// https://github.com/cameron314/readerwriterqueue
template<typename T>
class FixedCapacityQueue
{
public:
FixedCapacityQueue(int capacity)
: mq(capacity){};

FixedCapacityQueue()
: mq(DEFAULT_QUEUE_SIZE){};

void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
throw std::runtime_error("Invalid queue timeout");
}

bool success =
mq.wait_enqueue_timed(std::move(value), timeoutMs * 1000);
if (!success) {
throw QueueTimeoutException("Timeout waiting for enqueue");
}
}

void dequeueIfPresent(T* res) { mq.try_dequeue(*res); }

T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
throw std::runtime_error("Invalid queue timeout");
}

T value;
bool success = mq.wait_dequeue_timed(value, timeoutMs * 1000);
if (!success) {
throw QueueTimeoutException("Timeout waiting for dequeue");
}

return value;
}

T* peek(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
throw std::runtime_error("Peek not implemented");
}

void drain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
T value;
bool success;
while (size() > 0) {
success = mq.wait_dequeue_timed(value, timeoutMs * 1000);
if (!success) {
throw QueueTimeoutException("Timeout waiting to drain");
}
}
}

long size() { return mq.size_approx(); }

void reset()
{
moodycamel::BlockingReaderWriterCircularBuffer<T> empty(
mq.max_capacity());
std::swap(mq, empty);
}

private:
moodycamel::BlockingReaderWriterCircularBuffer<T> mq;
};

class TokenPool
{
public:
Expand Down
8 changes: 7 additions & 1 deletion src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ void MpiWorld::recv(int sendRank,
doRecv(m, buffer, dataType, count, status, messageType);
}

void MpiWorld::doRecv(std::shared_ptr<faabric::MPIMessage> m,
void MpiWorld::doRecv(std::shared_ptr<faabric::MPIMessage>& m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
Expand Down Expand Up @@ -1494,10 +1494,16 @@ void MpiWorld::allToAll(int rank,
}
}

// 30/12/21 - Probe is now broken after the switch to a different type of
// queues for local messaging. New queues don't support (off-the-shelf) the
// ability to return a reference to the first element in the queue. In order
// to re-include support for probe we must fix the peek method in the
// queues.
void MpiWorld::probe(int sendRank, int recvRank, MPI_Status* status)
{
const std::shared_ptr<InMemoryMpiQueue>& queue =
getLocalQueue(sendRank, recvRank);
// 30/12/21 - Peek will throw a runtime error
std::shared_ptr<faabric::MPIMessage> m = *(queue->peek());

faabric_datatype_t* datatype = getFaabricDatatypeFromId(m->type());
Expand Down
4 changes: 3 additions & 1 deletion tests/test/scheduler/test_mpi_world.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,9 @@ TEST_CASE_METHOD(MpiTestFixture, "Test recv with partial data", "[mpi]")
REQUIRE(status.bytesSize == actualSize * sizeof(int));
}

TEST_CASE_METHOD(MpiTestFixture, "Test probe", "[mpi]")
// 30/12/21 - MPI_Probe is broken after the switch to single-producer, single-
// consumer fixed capacity queues.
TEST_CASE_METHOD(MpiTestFixture, "Test probe", "[.]")
{
// Send two messages of different sizes
std::vector<int> messageData = { 0, 1, 2, 3, 4, 5, 6 };
Expand Down
144 changes: 138 additions & 6 deletions tests/test/util/test_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
using namespace faabric::util;

typedef faabric::util::Queue<int> IntQueue;
typedef faabric::util::FixedCapacityQueue<int> FixedCapIntQueue;
typedef faabric::util::Queue<std::promise<int32_t>> PromiseQueue;
typedef faabric::util::FixedCapacityQueue<std::promise<int32_t>>
FixedCapPromiseQueue;

namespace tests {
TEST_CASE("Test queue operations", "[util]")
Expand Down Expand Up @@ -52,9 +56,9 @@ TEST_CASE("Test queue operations", "[util]")
REQUIRE_THROWS(q.dequeue(1));
}

TEST_CASE("Test drain queue", "[util]")
TEMPLATE_TEST_CASE("Test drain queue", "[util]", IntQueue, FixedCapIntQueue)
{
IntQueue q;
TestType q;

q.enqueue(1);
q.enqueue(2);
Expand Down Expand Up @@ -105,9 +109,12 @@ TEST_CASE("Test wait for draining queue with elements", "[util]")
REQUIRE(dequeued == expected);
}

TEST_CASE("Test queue on non-copy-constructible object", "[util]")
TEMPLATE_TEST_CASE("Test queue on non-copy-constructible object",
"[util]",
PromiseQueue,
FixedCapPromiseQueue)
{
faabric::util::Queue<std::promise<int32_t>> q;
TestType q;

std::promise<int32_t> a;
std::promise<int32_t> b;
Expand Down Expand Up @@ -135,16 +142,141 @@ TEST_CASE("Test queue on non-copy-constructible object", "[util]")
REQUIRE(fb.get() == 2);
}

TEST_CASE("Test queue timeout must be positive", "[util]")
TEMPLATE_TEST_CASE("Test queue timeout must be positive",
"[util]",
IntQueue,
FixedCapIntQueue)
{
int timeoutValueMs;

SECTION("Zero timeout") { timeoutValueMs = 0; }

SECTION("Negative timeout") { timeoutValueMs = -1; }

faabric::util::Queue<int> q;
TestType q;
q.enqueue(10);
REQUIRE_THROWS(q.dequeue(timeoutValueMs));
}

TEST_CASE("Test fixed capacity queue blocks if queue is full", "[util]")
{
FixedCapIntQueue q(2);

q.enqueue(1);
q.enqueue(2);

// Enqueue with a short timeout so the operation fails quickly
REQUIRE_THROWS_AS(q.enqueue(100), QueueTimeoutException);
}

TEST_CASE("Test fixed capacity queue", "[util]")
{
FixedCapIntQueue q(2);
auto latch = faabric::util::Latch::create(2);

std::thread consumerThread([&latch, &q] {
// Make sure we consume once to make one slot in the queue
latch->wait();
q.dequeue();
});

// Fill the queue
q.enqueue(1);
q.enqueue(2);
// Trigger the consumer thread to consume once
latch->wait();
// Check we can then enqueue a third time
q.enqueue(3);

if (consumerThread.joinable()) {
consumerThread.join();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these tests could still do a better job of exercising the await-to-enqueue logic.

Something like this is fairly certain to exercise it:

    FixedCapIntQueue q(2);
    int nMessages = 100;

    // Fast producer
    bool producerSuccess = false;
    std::thread producerThread([&q, nMessages, &producerSuccess] {
        for(int i = 0; i < nMessages; i++) {
            SLEEP_MS(1);
            q.enqueue(i);
        }
        
        producerSuccess = true;
    };
    
    // Slow consumer
    bool consumerSuccess = false;
    std::thread consumerThread([&latch, nMessages, &consumerSuccess] {
        for(int i = 0; i < nMessages; i++) {
            SLEEP_MS(100);
            int res = q.dequeue();            
            if(res != i) {
                return;
            }
        }
        
        consumerSuccess = true;
    });

    if (producerThread.joinable()) {
        producerThread.join();
    }

    if (consumerThread.joinable()) {
        consumerThread.join();
    }
    
    REQUIRE(producerSuccess);
    REQUIRE(consumerSuccess);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I add this test but halve the sleeping time in the slower thread so the test doesn't take too long to run.

}

TEST_CASE("Stress test fixed capacity queue", "[util]")
{
int numThreadPairs = 10;
int numMessages = 1000;
std::vector<std::thread> producerThreads;
std::vector<std::thread> consumerThreads;
std::vector<std::unique_ptr<FixedCapIntQueue>> queues;
auto startLatch = faabric::util::Latch::create(2 * numThreadPairs + 1);

for (int i = 0; i < numThreadPairs; i++) {
producerThreads.emplace_back([&queues, &startLatch, numMessages, i] {
startLatch->wait();

for (int j = 0; j < numMessages; j++) {
queues.at(i)->enqueue(i * j);
}
});
consumerThreads.emplace_back([&queues, &startLatch, numMessages, i] {
startLatch->wait();

for (int j = 0; j < numMessages; j++) {
int result = queues.at(i)->dequeue();
assert(result == i * j);
}
});
queues.emplace_back(std::make_unique<FixedCapIntQueue>(10));
}

// Signal threads to start consuming and producing
startLatch->wait();

// Join all threads
for (auto& t : producerThreads) {
if (t.joinable()) {
t.join();
}
}
for (auto& t : consumerThreads) {
if (t.joinable()) {
t.join();
}
}
}

TEST_CASE("Test fixed capacity queue with asymetric consume/produce rates",
"[util]")
{
FixedCapIntQueue q(2);
int nMessages = 100;

// Fast producer
bool producerSuccess = false;
std::thread producerThread([&q, nMessages, &producerSuccess] {
for (int i = 0; i < nMessages; i++) {
SLEEP_MS(1);
q.enqueue(i);
}

producerSuccess = true;
});

// Slow consumer
bool consumerSuccess = false;
std::thread consumerThread([&q, nMessages, &consumerSuccess] {
for (int i = 0; i < nMessages; i++) {
SLEEP_MS(50);
int res = q.dequeue();
if (res != i) {
return;
}
}

consumerSuccess = true;
});

if (producerThread.joinable()) {
producerThread.join();
}

if (consumerThread.joinable()) {
consumerThread.join();
}

REQUIRE(producerSuccess);
REQUIRE(consumerSuccess);
}
}
2 changes: 2 additions & 0 deletions thread-sanitizer-ignorelist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ race:zmq::*
race:faabric::util::SystemConfig::*
# Catch2 allocates in its signal handler, this prevents showing the wrong crash report
signal:*
# TODO: moodycamel's queue version 1.0.6 fixes the warnings we silence here
race:moodycamel::*

# TODO: Remove: There's something weird going on with MPI code I don't understand
race:faabric::scheduler::MpiWorld::*