Skip to content

Commit

Permalink
switch underlying queue to moodycamel's, and invalidate probe
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Dec 30, 2021
1 parent 002a71e commit 86b629b
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 118 deletions.
3 changes: 3 additions & 0 deletions cmake/ExternalProjects.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ conan_cmake_configure(
hiredis/1.0.2@#297f55bf1e66f8b9c1dc0e7d35e705ab
protobuf/3.17.1@#12f6551f4a57bbd3bf38ff3aad6aaa7e
rapidjson/cci.20200410@#abe3eeacf36801901f6f6d82d124781a
readerwriterqueue/1.0.5@#4232c2ff826eb41e33d8ad8efd3c4c4c
spdlog/1.9.2@#3724602b7b7e843c5e0a687c45e279c9
zeromq/4.3.4@#3b9b0de9c4509784dc92629f3aaf2fe4
GENERATORS
Expand Down Expand Up @@ -89,6 +90,7 @@ find_package(cppzmq REQUIRED)
find_package(fmt REQUIRED)
find_package(hiredis REQUIRED)
find_package(spdlog REQUIRED)
find_package(readerwriterqueue REQUIRED)

# Pistache - Conan version is out of date and doesn't support clang
FetchContent_Declare(pistache_ext
Expand Down Expand Up @@ -142,6 +144,7 @@ target_link_libraries(faabric_common_dependencies INTERFACE
pistache::pistache
protobuf::libprotobuf
RapidJSON::RapidJSON
readerwriterqueue::readerwriterqueue
spdlog::spdlog
Threads::Threads
zstd::libzstd_static
Expand Down
120 changes: 24 additions & 96 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <boost/circular_buffer.hpp>
#include <condition_variable>
#include <queue>
#include <readerwriterqueue/readerwritercircularbuffer.h>

#define DEFAULT_QUEUE_TIMEOUT_MS 5000
#define DEFAULT_QUEUE_SIZE 1024
Expand Down Expand Up @@ -140,7 +141,8 @@ class Queue
std::mutex mx;
};

// Fixed size queue using a circular buffer as underlying container
// Wrapper around moodycamel's blocking fixed capacity single producer single
// consumer queue
template<typename T>
class FixedCapacityQueue
{
Expand All @@ -153,138 +155,64 @@ class FixedCapacityQueue

void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
UniqueLock lock(mx);

if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
throw std::runtime_error("Invalid queue timeout");
}

// If the queue is full, wait until elements are consumed
while (mq.size() == mq.capacity()) {
std::cv_status returnVal = notFullNotifier.wait_for(
lock, std::chrono::milliseconds(timeoutMs));

// Work out if this has returned due to timeout expiring
if (returnVal == std::cv_status::timeout) {
throw QueueTimeoutException(
"Timeout waiting for queue to empty");
}
bool success =
mq.wait_enqueue_timed(std::move(value), timeoutMs * 1000);
if (!success) {
throw QueueTimeoutException("Timeout waiting for enqueue");
}

mq.push_back(std::move(value));
notEmptyNotifier.notify_one();
}

void dequeueIfPresent(T* res)
{
UniqueLock lock(mx);

if (!mq.empty()) {
T value = std::move(mq.front());
mq.pop_front();
notFullNotifier.notify_one();

*res = value;
}
}
void dequeueIfPresent(T* res) { mq.try_dequeue(*res); }

T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
UniqueLock lock(mx);

if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
throw std::runtime_error("Invalid queue timeout");
}

while (mq.empty()) {
std::cv_status returnVal = notEmptyNotifier.wait_for(
lock, std::chrono::milliseconds(timeoutMs));

// Work out if this has returned due to timeout expiring
if (returnVal == std::cv_status::timeout) {
throw QueueTimeoutException("Timeout waiting for dequeue");
}
T value;
bool success = mq.wait_dequeue_timed(value, timeoutMs * 1000);
if (!success) {
throw QueueTimeoutException("Timeout waiting for dequeue");
}

T value = std::move(mq.front());
mq.pop_front();
notFullNotifier.notify_one();

return value;
}

T* peek(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
UniqueLock lock(mx);

if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
throw std::runtime_error("Invalid queue timeout");
}

while (mq.empty()) {
std::cv_status returnVal = notEmptyNotifier.wait_for(
lock, std::chrono::milliseconds(timeoutMs));

// Work out if this has returned due to timeout expiring
if (returnVal == std::cv_status::timeout) {
throw QueueTimeoutException("Timeout waiting for dequeue");
}
}

return &mq.front();
throw std::runtime_error("Peek not implemented");
}

void waitToDrain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
void drain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
UniqueLock lock(mx);

if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
throw std::runtime_error("Invalid queue timeout");
}

while (!mq.empty()) {
std::cv_status returnVal = notFullNotifier.wait_for(
lock, std::chrono::milliseconds(timeoutMs));

// Work out if this has returned due to timeout expiring
if (returnVal == std::cv_status::timeout) {
throw QueueTimeoutException("Timeout waiting for empty");
T value;
bool success;
while (size() > 0) {
success = mq.wait_dequeue_timed(value, timeoutMs * 1000);
if (!success) {
throw QueueTimeoutException("Timeout waiting to drain");
}
}
}

void drain()
{
UniqueLock lock(mx);

while (!mq.empty()) {
mq.pop_front();
}
}

long size()
{
UniqueLock lock(mx);
return mq.size();
}
long size() { return mq.size_approx(); }

void reset()
{
UniqueLock lock(mx);

boost::circular_buffer<T> empty(mq.capacity());
moodycamel::BlockingReaderWriterCircularBuffer<T> empty(
mq.max_capacity());
std::swap(mq, empty);
}

private:
boost::circular_buffer<T> mq;
std::condition_variable notFullNotifier;
std::condition_variable notEmptyNotifier;
std::mutex mx;
moodycamel::BlockingReaderWriterCircularBuffer<T> mq;
};

class TokenPool
Expand Down
12 changes: 8 additions & 4 deletions include/faabric/util/timing.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
#include <string>

#ifdef TRACE_ALL
#define PROF_BEGIN faabric::util::startGlobalTimer();
#define PROF_START(name) \
const faabric::util::TimePoint name = faabric::util::startTimer();
#define PROF_END(name) faabric::util::logEndTimer(#name, name);
#define PROF_SUMMARY faabric::util::printTimerTotals();
#define PROF_SUMMARY_START faabric::util::startGlobalTimer();
#define PROF_SUMMARY_END faabric::util::printTimerTotals();
#define PROF_CLEAR faabric::util::clearTimerTotals();
#else
#define PROF_BEGIN
#define PROF_START(name)
#define PROF_END(name)
#define PROF_SUMMARY
#define PROF_SUMMARY_START
#define PROF_SUMMARY_END
#define PROF_CLEAR
#endif

namespace faabric::util {
Expand All @@ -32,6 +34,8 @@ void startGlobalTimer();

void printTimerTotals();

void clearTimerTotals();

uint64_t timespecToNanos(struct timespec* nativeTimespec);

void nanosToTimespec(uint64_t nanos, struct timespec* nativeTimespec);
Expand Down
2 changes: 2 additions & 0 deletions src/runner/FaabricMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,7 @@ void FaabricMain::shutdown()
sch.shutdown();

SPDLOG_INFO("Faabric pool successfully shut down");

PROF_SUMMARY_END
}
}
134 changes: 134 additions & 0 deletions src/runner/queue_prof.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/util/config.h>
#include <faabric/util/crash.h>
#include <faabric/util/environment.h>
#include <faabric/util/logging.h>
#include <faabric/util/queue.h>
#include <faabric/util/timing.h>

using namespace faabric::util;

static thread_local std::shared_ptr<Queue<std::shared_ptr<faabric::MPIMessage>>>
queue;

static float doRun(int nProducers, int nConsumers, size_t msgSize, bool single)
{
int nMessages = 2000000;
int perProducer = nMessages / nProducers;
int perConsumer = nMessages / nConsumers;

SPDLOG_TRACE(
"{} ({}) {} ({})", nProducers, perProducer, nConsumers, perConsumer);

std::vector<std::shared_ptr<Queue<std::shared_ptr<faabric::MPIMessage>>>>
queues;

if (single) {
queues.emplace_back(
std::make_shared<Queue<std::shared_ptr<faabric::MPIMessage>>>());
} else {
for (int i = 0; i < nProducers; i++) {
queues.emplace_back(
std::make_shared<Queue<std::shared_ptr<faabric::MPIMessage>>>());
}
}

std::vector<std::thread> producers;
std::vector<std::thread> consumers;

TimePoint tp = startTimer();
for (int p = 0; p < nProducers; p++) {
producers.emplace_back([single, p, &queues, msgSize, perProducer] {
if (single) {
queue = queues[0];
} else {
queue = queues[p];
}

std::vector<uint8_t> data(msgSize, 5);
auto message = std::make_shared<faabric::MPIMessage>();
message->set_buffer(data.data(), data.size());
for (int i = 0; i < perProducer; i++) {
PROF_START(Enqueue)
queue->enqueue(message);
PROF_END(Enqueue)
}
});
}

for (int c = 0; c < nConsumers; c++) {
consumers.emplace_back([single, c, &queues, perConsumer] {
if (single) {
queue = queues[0];
} else {
queue = queues[c];
}

for (int i = 0; i < perConsumer; i++) {
PROF_START(Dequeue)
queue->dequeue();
PROF_END(Dequeue)
}
});
}

for (auto& p : producers) {
if (p.joinable()) {
p.join();
}
}

for (auto& c : consumers) {
if (c.joinable()) {
c.join();
}
}

long totalTime = getTimeDiffMicros(tp);
return ((float)totalTime) / nMessages;
}

int main()
{
std::vector<int> producers = { 1, 10, 100 };
std::vector<int> consumers = { 1, 10, 100 };

SPDLOG_INFO("Starting queue profiler\n");

std::cout.flush();

size_t messageSize = 1024L * 1024L;

for (auto p : producers) {
for (auto c : consumers) {
printf("------------- SHARED QUEUE --------------\n");
printf("P\t C\t R\t Per msg\n");

PROF_SUMMARY_START

float ratio = float(p) / c;
float runTime = doRun(p, c, messageSize, true);
printf("%i\t %i \t %.2f \t %.5fus \n", p, c, ratio, runTime);
std::cout.flush();

PROF_SUMMARY_END
PROF_CLEAR
}
}

for (auto p : producers) {
printf("------------- SPSC QUEUE --------------\n");
printf("P\t C\t R\t Per msg\n");

PROF_SUMMARY_START

float runTime = doRun(p, p, messageSize, true);
printf("%i\t %i \t %.5fus \n", p, p, runTime);
std::cout.flush();

PROF_SUMMARY_END
PROF_CLEAR
}

return 0;
}
6 changes: 6 additions & 0 deletions src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1494,10 +1494,16 @@ void MpiWorld::allToAll(int rank,
}
}

// 30/12/21 - Probe is now broken after the switch to a different type of
// queues for local messaging. New queues don't support (off the shelve) the
// ability to return a reference to the first element in the queue. In order
// to re-include support for probe we must fix the peek method in the
// queues.
void MpiWorld::probe(int sendRank, int recvRank, MPI_Status* status)
{
const std::shared_ptr<InMemoryMpiQueue>& queue =
getLocalQueue(sendRank, recvRank);
// 30/12/21 - Peek will throw a runtime error
std::shared_ptr<faabric::MPIMessage> m = *(queue->peek());

faabric_datatype_t* datatype = getFaabricDatatypeFromId(m->type());
Expand Down
Loading

0 comments on commit 86b629b

Please sign in to comment.