Skip to content

Commit

Permalink
add shutdown method and call it from mpiworld's destroy()
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed May 27, 2021
1 parent d502e61 commit ceb549a
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
4 changes: 2 additions & 2 deletions include/faabric/scheduler/MpiThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ class MpiAsyncThreadPool
public:
explicit MpiAsyncThreadPool(int nThreads);

~MpiAsyncThreadPool();
void shutdown();

int size;

std::shared_ptr<MpiReqQueue> getMpiReqQueue();

private:
std::vector<std::thread> threadPool;
std::atomic<bool> shutdown;
std::atomic<bool> isShutdown;

std::shared_ptr<MpiReqQueue> localReqQueue;

Expand Down
2 changes: 1 addition & 1 deletion include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class MpiWorld

void destroy();

void closeFunctionCallClients();
void shutdownThreadPool();

void enqueueMessage(faabric::MPIMessage& msg);

Expand Down
10 changes: 5 additions & 5 deletions src/scheduler/MpiThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace faabric::scheduler {
MpiAsyncThreadPool::MpiAsyncThreadPool(int nThreads)
: size(nThreads)
, shutdown(false)
, isShutdown(false)
{
faabric::util::getLogger()->debug(
"Starting an MpiAsyncThreadPool of size {}", nThreads);
Expand All @@ -19,7 +19,7 @@ MpiAsyncThreadPool::MpiAsyncThreadPool(int nThreads)
}
}

MpiAsyncThreadPool::~MpiAsyncThreadPool()
void MpiAsyncThreadPool::shutdown()
{
faabric::util::getLogger()->debug("Shutting down MpiAsyncThreadPool");

Expand All @@ -39,7 +39,7 @@ void MpiAsyncThreadPool::entrypoint(int i)
{
faabric::scheduler::ReqQueueType req;

while (!this->shutdown) {
while (!this->isShutdown) {
req = getMpiReqQueue()->dequeue();

int id = std::get<0>(req);
Expand All @@ -51,8 +51,8 @@ void MpiAsyncThreadPool::entrypoint(int i)
// The shutdown tuple includes a TLS cleanup function that we run
// _once per thread_ and exit
func();
if (!this->shutdown) {
this->shutdown = true;
if (!this->isShutdown) {
this->isShutdown = true;
}
break;
}
Expand Down
6 changes: 4 additions & 2 deletions src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void MpiWorld::destroy()
{
// Destroy once per host
if (!isDestroyed.test_and_set()) {
closeFunctionCallClients();
shutdownThreadPool();

// Note - we are deliberately not deleting the KV in the global state
// TODO - find a way to do this only from the master client
Expand All @@ -149,7 +149,7 @@ void MpiWorld::destroy()
}
}

void MpiWorld::closeFunctionCallClients()
void MpiWorld::shutdownThreadPool()
{
// When shutting down the thread pool, we also make sure we clean all thread
// local state by sending a clear message to the queue. Currently, we only
Expand All @@ -162,6 +162,8 @@ void MpiWorld::closeFunctionCallClients()
std::move(p)));
}

threadPool->shutdown();

// Lastly clean the main thread as well
closeThreadLocalClients();
}
Expand Down

0 comments on commit ceb549a

Please sign in to comment.