From 2c942623c2e0d021f2d07580746568f086278eea Mon Sep 17 00:00:00 2001 From: Simon Shillaker <554768+Shillaker@users.noreply.github.com> Date: Thu, 10 Jun 2021 10:22:11 +0200 Subject: [PATCH] Pass non-const messages to executors (#109) * Pass non-const messages to executors * Fixing mpi-native and dist tests executors * Fix example executor * Add error message rather than assertion * Revert error message for checking MPI --- examples/server.cpp | 5 ++--- include/faabric/mpi-native/MpiExecutor.h | 5 ++--- include/faabric/scheduler/ExecutorFactory.h | 3 +-- include/faabric/scheduler/Scheduler.h | 8 ++++---- src/mpi_native/MpiExecutor.cpp | 2 +- src/scheduler/Executor.cpp | 8 ++++---- src/scheduler/Scheduler.cpp | 4 ++-- tests/dist/DistTestExecutor.cpp | 6 +++--- tests/dist/DistTestExecutor.h | 6 +++--- tests/test/scheduler/test_executor.cpp | 7 +++---- tests/test/scheduler/test_scheduler.cpp | 5 ++--- tests/utils/DummyExecutor.cpp | 2 +- tests/utils/DummyExecutor.h | 2 +- tests/utils/DummyExecutorFactory.cpp | 2 +- tests/utils/DummyExecutorFactory.h | 3 +-- 15 files changed, 31 insertions(+), 37 deletions(-) diff --git a/examples/server.cpp b/examples/server.cpp index f9702d43c..d8bfc6611 100644 --- a/examples/server.cpp +++ b/examples/server.cpp @@ -8,7 +8,7 @@ using namespace faabric::scheduler; class ExampleExecutor : public Executor { public: - ExampleExecutor(const faabric::Message& msg) + ExampleExecutor(faabric::Message& msg) : Executor(msg) {} @@ -31,8 +31,7 @@ class ExampleExecutor : public Executor class ExampleExecutorFactory : public ExecutorFactory { protected: - std::shared_ptr createExecutor( - const faabric::Message& msg) override + std::shared_ptr createExecutor(faabric::Message& msg) override { return std::make_shared(msg); } diff --git a/include/faabric/mpi-native/MpiExecutor.h b/include/faabric/mpi-native/MpiExecutor.h index 3c9426be9..3cc012c5a 100644 --- a/include/faabric/mpi-native/MpiExecutor.h +++ b/include/faabric/mpi-native/MpiExecutor.h @@ -11,7 +11,7 @@ namespace faabric::mpi_native { class MpiExecutor final : public Executor { public: - explicit MpiExecutor(const faabric::Message& msg); + explicit MpiExecutor(faabric::Message& msg); int32_t executeTask( int threadPoolIdx, @@ -22,8 +22,7 @@ class MpiExecutor final : public Executor class MpiExecutorFactory : public ExecutorFactory { protected: - std::shared_ptr createExecutor( - const faabric::Message& msg) override + std::shared_ptr createExecutor(faabric::Message& msg) override { return std::make_unique(msg); } diff --git a/include/faabric/scheduler/ExecutorFactory.h b/include/faabric/scheduler/ExecutorFactory.h index 1d3a58876..b06f0e36d 100644 --- a/include/faabric/scheduler/ExecutorFactory.h +++ b/include/faabric/scheduler/ExecutorFactory.h @@ -9,8 +9,7 @@ class ExecutorFactory public: virtual ~ExecutorFactory(){}; - virtual std::shared_ptr createExecutor( - const faabric::Message& msg) = 0; + virtual std::shared_ptr createExecutor(faabric::Message& msg) = 0; }; void setExecutorFactory(std::shared_ptr fac); diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 85ecb75c5..f8396e1e5 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -28,7 +28,7 @@ class Executor public: std::string id; - explicit Executor(const faabric::Message& msg); + explicit Executor(faabric::Message& msg); virtual ~Executor(); @@ -39,7 +39,7 @@ class Executor virtual void flush(); - virtual void reset(const faabric::Message& msg); + virtual void reset(faabric::Message& msg); virtual int32_t executeTask( int threadPoolIdx, @@ -55,7 +55,7 @@ class Executor virtual faabric::util::SnapshotData snapshot(); protected: - virtual void restore(const faabric::Message& msg); + virtual void restore(faabric::Message& msg); virtual void postFinish(); @@ -215,7 +215,7 @@ class Scheduler std::vector getUnregisteredHosts(const std::string& funcStr, bool noCache = false); - std::shared_ptr claimExecutor(const faabric::Message& msg); + std::shared_ptr claimExecutor(faabric::Message& msg); faabric::HostResources getHostResources(const std::string& host); diff --git a/src/mpi_native/MpiExecutor.cpp b/src/mpi_native/MpiExecutor.cpp index c4d2b025e..36b255056 100644 --- a/src/mpi_native/MpiExecutor.cpp +++ b/src/mpi_native/MpiExecutor.cpp @@ -5,7 +5,7 @@ namespace faabric::mpi_native { faabric::Message* executingCall; int mpiFunc(); -MpiExecutor::MpiExecutor(const faabric::Message& msg) +MpiExecutor::MpiExecutor(faabric::Message& msg) : Executor(msg){}; int32_t MpiExecutor::executeTask( diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 18b152317..360e859ea 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -15,7 +15,7 @@ namespace faabric::scheduler { // TODO - avoid the copy of the message here? -Executor::Executor(const faabric::Message& msg) +Executor::Executor(faabric::Message& msg) : boundMessage(msg) , threadPoolSize(faabric::util::getUsableCores()) , threadPoolThreads(threadPoolSize) @@ -98,7 +98,7 @@ void Executor::executeTasks(std::vector msgIdxs, // Restore if necessary. If we're executing threads on the master host we // assume we don't need to restore, but for everything else we do. If we've // already restored from this snapshot, we don't do so again. - const faabric::Message& firstMsg = req->messages().at(0); + faabric::Message& firstMsg = req->mutable_messages()->at(0); std::string snapshotKey = firstMsg.snapshotkey(); std::string thisHost = faabric::util::getSystemConfig().endpointHost; @@ -318,7 +318,7 @@ void Executor::postFinish() {} void Executor::flush() {} -void Executor::reset(const faabric::Message& msg) {} +void Executor::reset(faabric::Message& msg) {} faabric::util::SnapshotData Executor::snapshot() { @@ -328,7 +328,7 @@ faabric::util::SnapshotData Executor::snapshot() return d; } -void Executor::restore(const faabric::Message& msg) +void Executor::restore(faabric::Message& msg) { faabric::util::getLogger()->warn( "Executor has not implemented restore method"); diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index e91bab795..297242c02 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -208,7 +208,7 @@ std::vector Scheduler::callFunctions( // Note, we assume all the messages are for the same function and have the // same master host - const faabric::Message& firstMsg = req->messages().at(0); + faabric::Message& firstMsg = req->mutable_messages()->at(0); std::string funcStr = faabric::util::funcToString(firstMsg, false); std::string masterHost = firstMsg.masterhost(); if (masterHost.empty()) { @@ -617,7 +617,7 @@ Scheduler::getRecordedMessagesShared() return recordedMessagesShared; } -std::shared_ptr Scheduler::claimExecutor(const faabric::Message& msg) +std::shared_ptr Scheduler::claimExecutor(faabric::Message& msg) { std::string funcStr = faabric::util::funcToString(msg, false); diff --git a/tests/dist/DistTestExecutor.cpp b/tests/dist/DistTestExecutor.cpp index 52c5c6774..661309b86 100644 --- a/tests/dist/DistTestExecutor.cpp +++ b/tests/dist/DistTestExecutor.cpp @@ -34,7 +34,7 @@ ExecutorFunction getDistTestExecutorCallback(const faabric::Message& msg) return executorFunctions[key]; } -DistTestExecutor::DistTestExecutor(const faabric::Message& msg) +DistTestExecutor::DistTestExecutor(faabric::Message& msg) : Executor(msg) {} @@ -61,7 +61,7 @@ faabric::util::SnapshotData DistTestExecutor::snapshot() return snap; } -void DistTestExecutor::restore(const faabric::Message& msg) +void DistTestExecutor::restore(faabric::Message& msg) { // Initialise the dummy memory and map to snapshot faabric::snapshot::SnapshotRegistry& reg = @@ -77,7 +77,7 @@ void DistTestExecutor::restore(const faabric::Message& msg) } std::shared_ptr DistTestExecutorFactory::createExecutor( - const faabric::Message& msg) + faabric::Message& msg) { return std::make_shared(msg); } diff --git a/tests/dist/DistTestExecutor.h b/tests/dist/DistTestExecutor.h index 4cf732740..7eecd73fc 100644 --- a/tests/dist/DistTestExecutor.h +++ b/tests/dist/DistTestExecutor.h @@ -20,7 +20,7 @@ void registerDistTestExecutorCallback(const char* user, class DistTestExecutor final : public faabric::scheduler::Executor { public: - DistTestExecutor(const faabric::Message& msg); + DistTestExecutor(faabric::Message& msg); ~DistTestExecutor(); @@ -35,13 +35,13 @@ class DistTestExecutor final : public faabric::scheduler::Executor size_t snapshotSize = 0; protected: - void restore(const faabric::Message& msg) override; + void restore(faabric::Message& msg) override; }; class DistTestExecutorFactory : public faabric::scheduler::ExecutorFactory { protected: std::shared_ptr createExecutor( - const faabric::Message& msg) override; + faabric::Message& msg) override; }; } diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 259820baf..a00b43e5d 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -28,7 +28,7 @@ std::atomic restoreCount = 0; class TestExecutor final : public Executor { public: - TestExecutor(const faabric::Message& msg) + TestExecutor(faabric::Message& msg) : Executor(msg) {} @@ -44,7 +44,7 @@ class TestExecutor final : public Executor } } - void restore(const faabric::Message& msg) + void restore(faabric::Message& msg) { restoreCount += 1; @@ -218,8 +218,7 @@ class TestExecutor final : public Executor class TestExecutorFactory : public ExecutorFactory { protected: - std::shared_ptr createExecutor( - const faabric::Message& msg) override + std::shared_ptr createExecutor(faabric::Message& msg) override { return std::make_shared(msg); } diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index beeb88769..4bfea9203 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -20,7 +20,7 @@ namespace tests { class SlowExecutor final : public Executor { public: - SlowExecutor(const faabric::Message& msg) + SlowExecutor(faabric::Message& msg) : Executor(msg) {} @@ -43,8 +43,7 @@ class SlowExecutor final : public Executor class SlowExecutorFactory : public ExecutorFactory { protected: - std::shared_ptr createExecutor( - const faabric::Message& msg) override + std::shared_ptr createExecutor(faabric::Message& msg) override { return std::make_shared(msg); } diff --git a/tests/utils/DummyExecutor.cpp b/tests/utils/DummyExecutor.cpp index bc0bef818..200a22e99 100644 --- a/tests/utils/DummyExecutor.cpp +++ b/tests/utils/DummyExecutor.cpp @@ -6,7 +6,7 @@ namespace faabric::scheduler { -DummyExecutor::DummyExecutor(const faabric::Message& msg) +DummyExecutor::DummyExecutor(faabric::Message& msg) : Executor(msg) {} diff --git a/tests/utils/DummyExecutor.h b/tests/utils/DummyExecutor.h index c8c40e108..1773fe999 100644 --- a/tests/utils/DummyExecutor.h +++ b/tests/utils/DummyExecutor.h @@ -7,7 +7,7 @@ namespace faabric::scheduler { class DummyExecutor final : public Executor { public: - DummyExecutor(const faabric::Message& msg); + DummyExecutor(faabric::Message& msg); ~DummyExecutor() override; diff --git a/tests/utils/DummyExecutorFactory.cpp b/tests/utils/DummyExecutorFactory.cpp index e928539c5..f5663fdf8 100644 --- a/tests/utils/DummyExecutorFactory.cpp +++ b/tests/utils/DummyExecutorFactory.cpp @@ -4,7 +4,7 @@ namespace faabric::scheduler { std::shared_ptr DummyExecutorFactory::createExecutor( - const faabric::Message& msg) + faabric::Message& msg) { return std::make_shared(msg); } diff --git a/tests/utils/DummyExecutorFactory.h b/tests/utils/DummyExecutorFactory.h index 2b8bfbf14..b44e50e80 100644 --- a/tests/utils/DummyExecutorFactory.h +++ b/tests/utils/DummyExecutorFactory.h @@ -7,7 +7,6 @@ namespace faabric::scheduler { class DummyExecutorFactory : public ExecutorFactory { protected: - std::shared_ptr createExecutor( - const faabric::Message& msg) override; + std::shared_ptr createExecutor(faabric::Message& msg) override; }; }