diff --git a/cmake/ExternalProjects.cmake b/cmake/ExternalProjects.cmake index dca6b0459..2c675f414 100644 --- a/cmake/ExternalProjects.cmake +++ b/cmake/ExternalProjects.cmake @@ -38,6 +38,7 @@ conan_cmake_configure( hiredis/1.0.2@#297f55bf1e66f8b9c1dc0e7d35e705ab protobuf/3.17.1@#12f6551f4a57bbd3bf38ff3aad6aaa7e rapidjson/cci.20200410@#abe3eeacf36801901f6f6d82d124781a + readerwriterqueue/1.0.5@#4232c2ff826eb41e33d8ad8efd3c4c4c spdlog/1.9.2@#3724602b7b7e843c5e0a687c45e279c9 zeromq/4.3.4@#3b9b0de9c4509784dc92629f3aaf2fe4 GENERATORS @@ -89,6 +90,7 @@ find_package(cppzmq REQUIRED) find_package(fmt REQUIRED) find_package(hiredis REQUIRED) find_package(spdlog REQUIRED) +find_package(readerwriterqueue REQUIRED) # Pistache - Conan version is out of date and doesn't support clang FetchContent_Declare(pistache_ext @@ -142,6 +144,7 @@ target_link_libraries(faabric_common_dependencies INTERFACE pistache::pistache protobuf::libprotobuf RapidJSON::RapidJSON + readerwriterqueue::readerwriterqueue spdlog::spdlog Threads::Threads zstd::libzstd_static diff --git a/include/faabric/util/queue.h b/include/faabric/util/queue.h index 1646f1495..5307b480d 100644 --- a/include/faabric/util/queue.h +++ b/include/faabric/util/queue.h @@ -7,6 +7,7 @@ #include #include #include +#include #define DEFAULT_QUEUE_TIMEOUT_MS 5000 #define DEFAULT_QUEUE_SIZE 1024 @@ -140,7 +141,8 @@ class Queue std::mutex mx; }; -// Fixed size queue using a circular buffer as underlying container +// Wrapper around moodycamel's blocking fixed capacity single producer single +// consumer queue template class FixedCapacityQueue { @@ -153,138 +155,64 @@ class FixedCapacityQueue void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) { - UniqueLock lock(mx); - if (timeoutMs <= 0) { SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs); throw std::runtime_error("Invalid queue timeout"); } - // If the queue is full, wait until elements are consumed - while (mq.size() == mq.capacity()) { - std::cv_status returnVal = notFullNotifier.wait_for( - lock, std::chrono::milliseconds(timeoutMs)); - - // Work out if this has returned due to timeout expiring - if (returnVal == std::cv_status::timeout) { - throw QueueTimeoutException( - "Timeout waiting for queue to empty"); - } + bool success = + mq.wait_enqueue_timed(std::move(value), timeoutMs * 1000); + if (!success) { + throw QueueTimeoutException("Timeout waiting for enqueue"); } - - mq.push_back(std::move(value)); - notEmptyNotifier.notify_one(); } - void dequeueIfPresent(T* res) - { - UniqueLock lock(mx); - - if (!mq.empty()) { - T value = std::move(mq.front()); - mq.pop_front(); - notFullNotifier.notify_one(); - - *res = value; - } - } + void dequeueIfPresent(T* res) { mq.try_dequeue(*res); } T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) { - UniqueLock lock(mx); - if (timeoutMs <= 0) { SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs); throw std::runtime_error("Invalid queue timeout"); } - while (mq.empty()) { - std::cv_status returnVal = notEmptyNotifier.wait_for( - lock, std::chrono::milliseconds(timeoutMs)); - - // Work out if this has returned due to timeout expiring - if (returnVal == std::cv_status::timeout) { - throw QueueTimeoutException("Timeout waiting for dequeue"); - } + T value; + bool success = mq.wait_dequeue_timed(value, timeoutMs * 1000); + if (!success) { + throw QueueTimeoutException("Timeout waiting for dequeue"); } - T value = std::move(mq.front()); - mq.pop_front(); - notFullNotifier.notify_one(); - return value; } T* peek(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) { - UniqueLock lock(mx); - - if (timeoutMs <= 0) { - SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs); - throw std::runtime_error("Invalid queue timeout"); - } - - while (mq.empty()) { - std::cv_status returnVal = notEmptyNotifier.wait_for( - lock, std::chrono::milliseconds(timeoutMs)); - - // Work out if this has returned due to timeout expiring - if (returnVal == std::cv_status::timeout) { - throw QueueTimeoutException("Timeout waiting for dequeue"); - } - } - - return &mq.front(); + throw std::runtime_error("Peek not implemented"); } - void waitToDrain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + void drain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) { - UniqueLock lock(mx); - - if (timeoutMs <= 0) { - SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs); - throw std::runtime_error("Invalid queue timeout"); - } - - while (!mq.empty()) { - std::cv_status returnVal = notFullNotifier.wait_for( - lock, std::chrono::milliseconds(timeoutMs)); - - // Work out if this has returned due to timeout expiring - if (returnVal == std::cv_status::timeout) { - throw QueueTimeoutException("Timeout waiting for empty"); + T value; + bool success; + while (size() > 0) { + success = mq.wait_dequeue_timed(value, timeoutMs * 1000); + if (!success) { + throw QueueTimeoutException("Timeout waiting to drain"); } } } - void drain() - { - UniqueLock lock(mx); - - while (!mq.empty()) { - mq.pop_front(); - } - } - - long size() - { - UniqueLock lock(mx); - return mq.size(); - } + long size() { return mq.size_approx(); } void reset() { - UniqueLock lock(mx); - - boost::circular_buffer empty(mq.capacity()); + moodycamel::BlockingReaderWriterCircularBuffer empty( + mq.max_capacity()); std::swap(mq, empty); } private: - boost::circular_buffer mq; - std::condition_variable notFullNotifier; - std::condition_variable notEmptyNotifier; - std::mutex mx; + moodycamel::BlockingReaderWriterCircularBuffer mq; }; class TokenPool diff --git a/include/faabric/util/timing.h b/include/faabric/util/timing.h index 34751ec54..ba97e5b31 100644 --- a/include/faabric/util/timing.h +++ b/include/faabric/util/timing.h @@ -4,16 +4,18 @@ #include #ifdef TRACE_ALL -#define PROF_BEGIN faabric::util::startGlobalTimer(); #define PROF_START(name) \ const faabric::util::TimePoint name = faabric::util::startTimer(); #define PROF_END(name) faabric::util::logEndTimer(#name, name); -#define PROF_SUMMARY faabric::util::printTimerTotals(); +#define PROF_SUMMARY_START faabric::util::startGlobalTimer(); +#define PROF_SUMMARY_END faabric::util::printTimerTotals(); +#define PROF_CLEAR faabric::util::clearTimerTotals(); #else -#define PROF_BEGIN #define PROF_START(name) #define PROF_END(name) -#define PROF_SUMMARY +#define PROF_SUMMARY_START +#define PROF_SUMMARY_END +#define PROF_CLEAR #endif namespace faabric::util { @@ -32,6 +34,8 @@ void startGlobalTimer(); void printTimerTotals(); +void clearTimerTotals(); + uint64_t timespecToNanos(struct timespec* nativeTimespec); void nanosToTimespec(uint64_t nanos, struct timespec* nativeTimespec); diff --git a/src/runner/FaabricMain.cpp b/src/runner/FaabricMain.cpp index e66152460..469e307ac 100644 --- a/src/runner/FaabricMain.cpp +++ b/src/runner/FaabricMain.cpp @@ -108,5 +108,7 @@ void FaabricMain::shutdown() sch.shutdown(); SPDLOG_INFO("Faabric pool successfully shut down"); + + PROF_SUMMARY_END } } diff --git a/src/runner/queue_prof.cpp b/src/runner/queue_prof.cpp new file mode 100644 index 000000000..52308d651 --- /dev/null +++ b/src/runner/queue_prof.cpp @@ -0,0 +1,134 @@ +#include +#include +#include +#include +#include +#include +#include + +using namespace faabric::util; + +static thread_local std::shared_ptr>> + queue; + +static float doRun(int nProducers, int nConsumers, size_t msgSize, bool single) +{ + int nMessages = 2000000; + int perProducer = nMessages / nProducers; + int perConsumer = nMessages / nConsumers; + + SPDLOG_TRACE( + "{} ({}) {} ({})", nProducers, perProducer, nConsumers, perConsumer); + + std::vector>>> + queues; + + if (single) { + queues.emplace_back( + std::make_shared>>()); + } else { + for (int i = 0; i < nProducers; i++) { + queues.emplace_back( + std::make_shared>>()); + } + } + + std::vector producers; + std::vector consumers; + + TimePoint tp = startTimer(); + for (int p = 0; p < nProducers; p++) { + producers.emplace_back([single, p, &queues, msgSize, perProducer] { + if (single) { + queue = queues[0]; + } else { + queue = queues[p]; + } + + std::vector data(msgSize, 5); + auto message = std::make_shared(); + message->set_buffer(data.data(), data.size()); + for (int i = 0; i < perProducer; i++) { + PROF_START(Enqueue) + queue->enqueue(message); + PROF_END(Enqueue) + } + }); + } + + for (int c = 0; c < nConsumers; c++) { + consumers.emplace_back([single, c, &queues, perConsumer] { + if (single) { + queue = queues[0]; + } else { + queue = queues[c]; + } + + for (int i = 0; i < perConsumer; i++) { + PROF_START(Dequeue) + queue->dequeue(); + PROF_END(Dequeue) + } + }); + } + + for (auto& p : producers) { + if (p.joinable()) { + p.join(); + } + } + + for (auto& c : consumers) { + if (c.joinable()) { + c.join(); + } + } + + long totalTime = getTimeDiffMicros(tp); + return ((float)totalTime) / nMessages; +} + +int main() +{ + std::vector producers = { 1, 10, 100 }; + std::vector consumers = { 1, 10, 100 }; + + SPDLOG_INFO("Starting queue profiler\n"); + + std::cout.flush(); + + size_t messageSize = 1024L * 1024L; + + for (auto p : producers) { + for (auto c : consumers) { + printf("------------- SHARED QUEUE --------------\n"); + printf("P\t C\t R\t Per msg\n"); + + PROF_SUMMARY_START + + float ratio = float(p) / c; + float runTime = doRun(p, c, messageSize, true); + printf("%i\t %i \t %.2f \t %.5fus \n", p, c, ratio, runTime); + std::cout.flush(); + + PROF_SUMMARY_END + PROF_CLEAR + } + } + + for (auto p : producers) { + printf("------------- SPSC QUEUE --------------\n"); + printf("P\t C\t R\t Per msg\n"); + + PROF_SUMMARY_START + + float runTime = doRun(p, p, messageSize, true); + printf("%i\t %i \t %.5fus \n", p, p, runTime); + std::cout.flush(); + + PROF_SUMMARY_END + PROF_CLEAR + } + + return 0; +} diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index afdcb4e42..cdf930c01 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -1494,10 +1494,16 @@ void MpiWorld::allToAll(int rank, } } +// 30/12/21 - Probe is now broken after the switch to a different type of +// queues for local messaging. New queues don't support (off the shelve) the +// ability to return a reference to the first element in the queue. In order +// to re-include support for probe we must fix the peek method in the +// queues. void MpiWorld::probe(int sendRank, int recvRank, MPI_Status* status) { const std::shared_ptr& queue = getLocalQueue(sendRank, recvRank); + // 30/12/21 - Peek will throw a runtime error std::shared_ptr m = *(queue->peek()); faabric_datatype_t* datatype = getFaabricDatatypeFromId(m->type()); diff --git a/src/util/timing.cpp b/src/util/timing.cpp index 85ec89b7e..4449391aa 100644 --- a/src/util/timing.cpp +++ b/src/util/timing.cpp @@ -78,6 +78,12 @@ void printTimerTotals() printf("Total running time: %.2fs\n\n", totalSeconds); } +void clearTimerTotals() +{ + timerTotals.clear(); + timerCounts.clear(); +} + uint64_t timespecToNanos(struct timespec* nativeTimespec) { uint64_t nanos = nativeTimespec->tv_sec * 1000000000; diff --git a/tests/test/scheduler/test_mpi_world.cpp b/tests/test/scheduler/test_mpi_world.cpp index 5669b37ce..73e795aa9 100644 --- a/tests/test/scheduler/test_mpi_world.cpp +++ b/tests/test/scheduler/test_mpi_world.cpp @@ -429,7 +429,9 @@ TEST_CASE_METHOD(MpiTestFixture, "Test recv with partial data", "[mpi]") REQUIRE(status.bytesSize == actualSize * sizeof(int)); } -TEST_CASE_METHOD(MpiTestFixture, "Test probe", "[mpi]") +// 30/12/21 - MPI_Probe is broken after the switch to single-producer, single- +// consumer fixed capacity queues. +TEST_CASE_METHOD(MpiTestFixture, "Test probe", "[.]") { // Send two messages of different sizes std::vector messageData = { 0, 1, 2, 3, 4, 5, 6 }; diff --git a/tests/test/util/test_queue.cpp b/tests/test/util/test_queue.cpp index b9f2ec70d..e0de5ae89 100644 --- a/tests/test/util/test_queue.cpp +++ b/tests/test/util/test_queue.cpp @@ -19,12 +19,9 @@ typedef faabric::util::FixedCapacityQueue> FixedCapPromiseQueue; namespace tests { -TEMPLATE_TEST_CASE("Test queue operations", - "[util]", - IntQueue, - FixedCapIntQueue) +TEST_CASE("Test queue operations", "[util]") { - TestType q; + IntQueue q; // Check deqeue if present does nothing if nothing in queue int dummy = -999; @@ -74,22 +71,16 @@ TEMPLATE_TEST_CASE("Test drain queue", "[util]", IntQueue, FixedCapIntQueue) REQUIRE(q.size() == 0); } -TEMPLATE_TEST_CASE("Test wait for draining empty queue", - "[util]", - IntQueue, - FixedCapIntQueue) +TEST_CASE("Test wait for draining empty queue", "[util]") { // Just need to check this doesn't fail - TestType q; + IntQueue q; q.waitToDrain(100); } -TEMPLATE_TEST_CASE("Test wait for draining queue with elements", - "[util]", - IntQueue, - FixedCapIntQueue) +TEST_CASE("Test wait for draining queue with elements", "[util]") { - TestType q; + IntQueue q; int nElems = 5; std::vector dequeued; std::vector expected; @@ -151,7 +142,10 @@ TEMPLATE_TEST_CASE("Test queue on non-copy-constructible object", REQUIRE(fb.get() == 2); } -TEST_CASE("Test queue timeout must be positive", "[util]") +TEMPLATE_TEST_CASE("Test queue timeout must be positive", + "[util]", + IntQueue, + FixedCapIntQueue) { int timeoutValueMs; @@ -159,7 +153,7 @@ TEST_CASE("Test queue timeout must be positive", "[util]") SECTION("Negative timeout") { timeoutValueMs = -1; } - faabric::util::Queue q; + TestType q; q.enqueue(10); REQUIRE_THROWS(q.dequeue(timeoutValueMs)); }