Skip to content

Commit

Permalink
Pass non-const messages to executors (#109)
Browse files Browse the repository at this point in the history
* Pass non-const messages to executors

* Fixing mpi-native and dist tests executors

* Fix example executor

* Add error message rather than assertion

* Revert error message for checking MPI
  • Loading branch information
Shillaker committed Jun 10, 2021
1 parent d6dda2c commit 2c94262
Show file tree
Hide file tree
Showing 15 changed files with 31 additions and 37 deletions.
5 changes: 2 additions & 3 deletions examples/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ using namespace faabric::scheduler;
class ExampleExecutor : public Executor
{
public:
ExampleExecutor(const faabric::Message& msg)
ExampleExecutor(faabric::Message& msg)
: Executor(msg)
{}

Expand All @@ -31,8 +31,7 @@ class ExampleExecutor : public Executor
class ExampleExecutorFactory : public ExecutorFactory
{
protected:
std::shared_ptr<Executor> createExecutor(
const faabric::Message& msg) override
std::shared_ptr<Executor> createExecutor(faabric::Message& msg) override
{
return std::make_shared<ExampleExecutor>(msg);
}
Expand Down
5 changes: 2 additions & 3 deletions include/faabric/mpi-native/MpiExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace faabric::mpi_native {
class MpiExecutor final : public Executor
{
public:
explicit MpiExecutor(const faabric::Message& msg);
explicit MpiExecutor(faabric::Message& msg);

int32_t executeTask(
int threadPoolIdx,
Expand All @@ -22,8 +22,7 @@ class MpiExecutor final : public Executor
class MpiExecutorFactory : public ExecutorFactory
{
protected:
std::shared_ptr<Executor> createExecutor(
const faabric::Message& msg) override
std::shared_ptr<Executor> createExecutor(faabric::Message& msg) override
{
return std::make_unique<MpiExecutor>(msg);
}
Expand Down
3 changes: 1 addition & 2 deletions include/faabric/scheduler/ExecutorFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ class ExecutorFactory
public:
virtual ~ExecutorFactory(){};

virtual std::shared_ptr<Executor> createExecutor(
const faabric::Message& msg) = 0;
virtual std::shared_ptr<Executor> createExecutor(faabric::Message& msg) = 0;
};

void setExecutorFactory(std::shared_ptr<ExecutorFactory> fac);
Expand Down
8 changes: 4 additions & 4 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Executor
public:
std::string id;

explicit Executor(const faabric::Message& msg);
explicit Executor(faabric::Message& msg);

virtual ~Executor();

Expand All @@ -39,7 +39,7 @@ class Executor

virtual void flush();

virtual void reset(const faabric::Message& msg);
virtual void reset(faabric::Message& msg);

virtual int32_t executeTask(
int threadPoolIdx,
Expand All @@ -55,7 +55,7 @@ class Executor
virtual faabric::util::SnapshotData snapshot();

protected:
virtual void restore(const faabric::Message& msg);
virtual void restore(faabric::Message& msg);

virtual void postFinish();

Expand Down Expand Up @@ -215,7 +215,7 @@ class Scheduler
std::vector<std::string> getUnregisteredHosts(const std::string& funcStr,
bool noCache = false);

std::shared_ptr<Executor> claimExecutor(const faabric::Message& msg);
std::shared_ptr<Executor> claimExecutor(faabric::Message& msg);

faabric::HostResources getHostResources(const std::string& host);

Expand Down
2 changes: 1 addition & 1 deletion src/mpi_native/MpiExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace faabric::mpi_native {
faabric::Message* executingCall;
int mpiFunc();

MpiExecutor::MpiExecutor(const faabric::Message& msg)
MpiExecutor::MpiExecutor(faabric::Message& msg)
: Executor(msg){};

int32_t MpiExecutor::executeTask(
Expand Down
8 changes: 4 additions & 4 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
namespace faabric::scheduler {

// TODO - avoid the copy of the message here?
Executor::Executor(const faabric::Message& msg)
Executor::Executor(faabric::Message& msg)
: boundMessage(msg)
, threadPoolSize(faabric::util::getUsableCores())
, threadPoolThreads(threadPoolSize)
Expand Down Expand Up @@ -98,7 +98,7 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
// Restore if necessary. If we're executing threads on the master host we
// assume we don't need to restore, but for everything else we do. If we've
// already restored from this snapshot, we don't do so again.
const faabric::Message& firstMsg = req->messages().at(0);
faabric::Message& firstMsg = req->mutable_messages()->at(0);
std::string snapshotKey = firstMsg.snapshotkey();
std::string thisHost = faabric::util::getSystemConfig().endpointHost;

Expand Down Expand Up @@ -318,7 +318,7 @@ void Executor::postFinish() {}

void Executor::flush() {}

void Executor::reset(const faabric::Message& msg) {}
void Executor::reset(faabric::Message& msg) {}

faabric::util::SnapshotData Executor::snapshot()
{
Expand All @@ -328,7 +328,7 @@ faabric::util::SnapshotData Executor::snapshot()
return d;
}

void Executor::restore(const faabric::Message& msg)
void Executor::restore(faabric::Message& msg)
{
faabric::util::getLogger()->warn(
"Executor has not implemented restore method");
Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ std::vector<std::string> Scheduler::callFunctions(

// Note, we assume all the messages are for the same function and have the
// same master host
const faabric::Message& firstMsg = req->messages().at(0);
faabric::Message& firstMsg = req->mutable_messages()->at(0);
std::string funcStr = faabric::util::funcToString(firstMsg, false);
std::string masterHost = firstMsg.masterhost();
if (masterHost.empty()) {
Expand Down Expand Up @@ -617,7 +617,7 @@ Scheduler::getRecordedMessagesShared()
return recordedMessagesShared;
}

std::shared_ptr<Executor> Scheduler::claimExecutor(const faabric::Message& msg)
std::shared_ptr<Executor> Scheduler::claimExecutor(faabric::Message& msg)
{
std::string funcStr = faabric::util::funcToString(msg, false);

Expand Down
6 changes: 3 additions & 3 deletions tests/dist/DistTestExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ExecutorFunction getDistTestExecutorCallback(const faabric::Message& msg)
return executorFunctions[key];
}

DistTestExecutor::DistTestExecutor(const faabric::Message& msg)
DistTestExecutor::DistTestExecutor(faabric::Message& msg)
: Executor(msg)
{}

Expand All @@ -61,7 +61,7 @@ faabric::util::SnapshotData DistTestExecutor::snapshot()
return snap;
}

void DistTestExecutor::restore(const faabric::Message& msg)
void DistTestExecutor::restore(faabric::Message& msg)
{
// Initialise the dummy memory and map to snapshot
faabric::snapshot::SnapshotRegistry& reg =
Expand All @@ -77,7 +77,7 @@ void DistTestExecutor::restore(const faabric::Message& msg)
}

std::shared_ptr<Executor> DistTestExecutorFactory::createExecutor(
const faabric::Message& msg)
faabric::Message& msg)
{
return std::make_shared<DistTestExecutor>(msg);
}
Expand Down
6 changes: 3 additions & 3 deletions tests/dist/DistTestExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void registerDistTestExecutorCallback(const char* user,
class DistTestExecutor final : public faabric::scheduler::Executor
{
public:
DistTestExecutor(const faabric::Message& msg);
DistTestExecutor(faabric::Message& msg);

~DistTestExecutor();

Expand All @@ -35,13 +35,13 @@ class DistTestExecutor final : public faabric::scheduler::Executor
size_t snapshotSize = 0;

protected:
void restore(const faabric::Message& msg) override;
void restore(faabric::Message& msg) override;
};

class DistTestExecutorFactory : public faabric::scheduler::ExecutorFactory
{
protected:
std::shared_ptr<faabric::scheduler::Executor> createExecutor(
const faabric::Message& msg) override;
faabric::Message& msg) override;
};
}
7 changes: 3 additions & 4 deletions tests/test/scheduler/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ std::atomic<int> restoreCount = 0;
class TestExecutor final : public Executor
{
public:
TestExecutor(const faabric::Message& msg)
TestExecutor(faabric::Message& msg)
: Executor(msg)
{}

Expand All @@ -44,7 +44,7 @@ class TestExecutor final : public Executor
}
}

void restore(const faabric::Message& msg)
void restore(faabric::Message& msg)
{
restoreCount += 1;

Expand Down Expand Up @@ -218,8 +218,7 @@ class TestExecutor final : public Executor
class TestExecutorFactory : public ExecutorFactory
{
protected:
std::shared_ptr<Executor> createExecutor(
const faabric::Message& msg) override
std::shared_ptr<Executor> createExecutor(faabric::Message& msg) override
{
return std::make_shared<TestExecutor>(msg);
}
Expand Down
5 changes: 2 additions & 3 deletions tests/test/scheduler/test_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace tests {
class SlowExecutor final : public Executor
{
public:
SlowExecutor(const faabric::Message& msg)
SlowExecutor(faabric::Message& msg)
: Executor(msg)
{}

Expand All @@ -43,8 +43,7 @@ class SlowExecutor final : public Executor
class SlowExecutorFactory : public ExecutorFactory
{
protected:
std::shared_ptr<Executor> createExecutor(
const faabric::Message& msg) override
std::shared_ptr<Executor> createExecutor(faabric::Message& msg) override
{
return std::make_shared<SlowExecutor>(msg);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/DummyExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace faabric::scheduler {

DummyExecutor::DummyExecutor(const faabric::Message& msg)
DummyExecutor::DummyExecutor(faabric::Message& msg)
: Executor(msg)
{}

Expand Down
2 changes: 1 addition & 1 deletion tests/utils/DummyExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace faabric::scheduler {
class DummyExecutor final : public Executor
{
public:
DummyExecutor(const faabric::Message& msg);
DummyExecutor(faabric::Message& msg);

~DummyExecutor() override;

Expand Down
2 changes: 1 addition & 1 deletion tests/utils/DummyExecutorFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace faabric::scheduler {

std::shared_ptr<Executor> DummyExecutorFactory::createExecutor(
const faabric::Message& msg)
faabric::Message& msg)
{
return std::make_shared<DummyExecutor>(msg);
}
Expand Down
3 changes: 1 addition & 2 deletions tests/utils/DummyExecutorFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace faabric::scheduler {
class DummyExecutorFactory : public ExecutorFactory
{
protected:
std::shared_ptr<Executor> createExecutor(
const faabric::Message& msg) override;
std::shared_ptr<Executor> createExecutor(faabric::Message& msg) override;
};
}

0 comments on commit 2c94262

Please sign in to comment.