diff --git a/include/faabric/scheduler/ExecutorContext.h b/include/faabric/scheduler/ExecutorContext.h new file mode 100644 index 000000000..0af381a91 --- /dev/null +++ b/include/faabric/scheduler/ExecutorContext.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include + +namespace faabric::scheduler { + +/** + * Globally-accessible wrapper that allows executing applications to query + * their execution context. The context is thread-local, so applications can + * query which specific message they are executing. + */ +class ExecutorContext +{ + public: + ExecutorContext(Executor* executorIn, + std::shared_ptr reqIn, + int msgIdx); + + static bool isSet(); + + static void set(Executor* executorIn, + std::shared_ptr reqIn, + int msgIdxIn); + + static void unset(); + + static std::shared_ptr get(); + + Executor* getExecutor() { return executor; } + + std::shared_ptr getBatchRequest() + { + return req; + } + + faabric::Message& getMsg() + { + if (req == nullptr) { + throw std::runtime_error( + "Getting message when no request set in context"); + } + return req->mutable_messages()->at(msgIdx); + } + + int getMsgIdx() { return msgIdx; } + + private: + Executor* executor = nullptr; + std::shared_ptr req = nullptr; + int msgIdx = 0; +}; +} diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 00f2736b0..75a7f19ae 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -38,13 +38,11 @@ class ExecutorTask ExecutorTask(int messageIndexIn, std::shared_ptr reqIn, - std::shared_ptr> batchCounterIn, - bool skipResetIn); + std::shared_ptr> batchCounterIn); std::shared_ptr req; std::shared_ptr> batchCounter; int messageIndex = 0; - bool skipReset = false; }; class Executor @@ -122,10 +120,6 @@ class Executor void threadPoolThread(int threadPoolIdx); }; -Executor* getExecutingExecutor(); - -void setExecutingExecutor(Executor* exec); - class Scheduler { public: diff --git a/src/scheduler/CMakeLists.txt b/src/scheduler/CMakeLists.txt index f1ced9340..9e86b8f82 100644 --- a/src/scheduler/CMakeLists.txt +++ b/src/scheduler/CMakeLists.txt @@ -1,5 +1,6 @@ faabric_lib(scheduler ExecGraph.cpp + ExecutorContext.cpp ExecutorFactory.cpp Executor.cpp FunctionCallClient.cpp diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 6e3a85cf9..efbd76120 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -31,26 +32,12 @@ namespace faabric::scheduler { -static thread_local Executor* executingExecutor = nullptr; - -Executor* getExecutingExecutor() -{ - return executingExecutor; -} - -void setExecutingExecutor(Executor* exec) -{ - executingExecutor = exec; -} - ExecutorTask::ExecutorTask(int messageIndexIn, std::shared_ptr reqIn, - std::shared_ptr> batchCounterIn, - bool skipResetIn) + std::shared_ptr> batchCounterIn) : req(std::move(reqIn)) , batchCounter(std::move(batchCounterIn)) , messageIndex(messageIndexIn) - , skipReset(skipResetIn) {} // TODO - avoid the copy of the message here? @@ -87,7 +74,7 @@ void Executor::finish() // Send a kill message SPDLOG_TRACE("Executor {} killing thread pool {}", id, i); threadTaskQueues[i].enqueue( - ExecutorTask(POOL_SHUTDOWN, nullptr, nullptr, false)); + ExecutorTask(POOL_SHUTDOWN, nullptr, nullptr)); faabric::util::UniqueLock threadsLock(threadsMutex); // Copy shared_ptr to avoid racing @@ -277,11 +264,6 @@ void Executor::executeTasks(std::vector msgIdxs, // Set up shared counter for this batch of tasks auto batchCounter = std::make_shared>(msgIdxs.size()); - // Work out if we should skip the reset after this batch. This happens - // for threads, as they will be restored from their respective snapshot - // on the next execution. - bool skipReset = isThreads; - // Iterate through and invoke tasks. By default, we allocate tasks // one-to-one with thread pool threads. Only once the pool is exhausted // do we start overloading @@ -328,7 +310,7 @@ void Executor::executeTasks(std::vector msgIdxs, // Enqueue the task threadTaskQueues[threadPoolIdx].enqueue( - ExecutorTask(msgIdx, req, batchCounter, skipReset)); + ExecutorTask(msgIdx, req, batchCounter)); // Lazily create the thread if (threadPoolThreads.at(threadPoolIdx) == nullptr) { @@ -452,8 +434,8 @@ void Executor::threadPoolThread(int threadPoolIdx) isThreads, msg.groupid()); - // Set executing executor - setExecutingExecutor(this); + // Set up context + ExecutorContext::set(this, task.req, task.messageIndex); // Execute the task int32_t returnValue; @@ -488,6 +470,9 @@ void Executor::threadPoolThread(int threadPoolIdx) msg.set_outputdata(errorMessage); } + // Unset context + ExecutorContext::unset(); + // Handle thread-local diffing for every thread if (doDirtyTracking) { // Stop dirty tracking @@ -590,7 +575,9 @@ void Executor::threadPoolThread(int threadPoolIdx) // claim. Note that we have to release the claim _after_ resetting, // otherwise the executor won't be ready for reuse if (isLastInBatch) { - if (task.skipReset) { + // Threads skip the reset as they will be restored from their + // respective snapshot on the next execution. + if (isThreads) { SPDLOG_TRACE("Skipping reset for {}", faabric::util::funcToString(msg, true)); } else { diff --git a/src/scheduler/ExecutorContext.cpp b/src/scheduler/ExecutorContext.cpp new file mode 100644 index 000000000..ffc71c03a --- /dev/null +++ b/src/scheduler/ExecutorContext.cpp @@ -0,0 +1,41 @@ +#include + +namespace faabric::scheduler { + +static thread_local std::shared_ptr context = nullptr; + +ExecutorContext::ExecutorContext( + Executor* executorIn, + std::shared_ptr reqIn, + int msgIdxIn) + : executor(executorIn) + , req(reqIn) + , msgIdx(msgIdxIn) +{} + +bool ExecutorContext::isSet() +{ + return context != nullptr; +} + +void ExecutorContext::set(Executor* executorIn, + std::shared_ptr reqIn, + int appIdxIn) +{ + context = std::make_shared(executorIn, reqIn, appIdxIn); +} + +void ExecutorContext::unset() +{ + context = nullptr; +} + +std::shared_ptr ExecutorContext::get() +{ + if (context == nullptr) { + SPDLOG_ERROR("No executor context set"); + throw std::runtime_error("No executor context set"); + } + return context; +} +} diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 1d58d0477..fe73ed54d 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -225,6 +226,36 @@ int32_t TestExecutor::executeTask( return 20; } + if (msg.function() == "context-check") { + std::shared_ptr ctx = + faabric::scheduler::ExecutorContext::get(); + if (ctx == nullptr) { + SPDLOG_ERROR("Executor context is null"); + return 999; + } + + if (ctx->getExecutor() != this) { + SPDLOG_ERROR("Executor not equal to this one"); + return 999; + } + + if (ctx->getBatchRequest()->id() != reqOrig->id()) { + SPDLOG_ERROR("Context request does not match ({} != {})", + ctx->getBatchRequest()->id(), + reqOrig->id()); + return 999; + } + + if (ctx->getMsgIdx() != msgIdx) { + SPDLOG_ERROR("Context message idx does not match ({} != {})", + ctx->getMsgIdx(), + msgIdx); + return 999; + } + + return 123; + } + if (reqOrig->type() == faabric::BatchExecuteRequest::THREADS) { SPDLOG_DEBUG("TestExecutor executing simple thread {}", msg.id()); return msg.id() / 100; @@ -1029,4 +1060,23 @@ TEST_CASE_METHOD(TestExecutorFixture, setMockMode(false); } + +TEST_CASE_METHOD(TestExecutorFixture, + "Test executor sees context", + "[executor]") +{ + int nMessages = 5; + std::shared_ptr req = + faabric::util::batchExecFactory("dummy", "context-check", nMessages); + int expectedResult = 123; + + sch.callFunctions(req); + + for (int i = 0; i < nMessages; i++) { + faabric::Message res = + sch.getFunctionResult(req->messages().at(i).id(), 2000); + + REQUIRE(res.returnvalue() == expectedResult); + } +} } diff --git a/tests/test/scheduler/test_executor_context.cpp b/tests/test/scheduler/test_executor_context.cpp new file mode 100644 index 000000000..245931ff1 --- /dev/null +++ b/tests/test/scheduler/test_executor_context.cpp @@ -0,0 +1,66 @@ +#include + +#include "faabric_utils.h" + +#include +#include + +using namespace faabric::scheduler; + +namespace tests { + +TEST_CASE_METHOD(ExecutorContextTestFixture, + "Test executor context", + "[scheduler]") +{ + REQUIRE(!ExecutorContext::isSet()); + + // Getting with no context should fail + REQUIRE_THROWS(ExecutorContext::get()); + + faabric::Message msg = faabric::util::messageFactory("foo", "bar"); + + std::shared_ptr fac = + std::make_shared(); + auto exec = fac->createExecutor(msg); + + auto req = faabric::util::batchExecFactory("foo", "bar", 5); + + SECTION("Set both executor and request") + { + ExecutorContext::set(exec.get(), req, 3); + + std::shared_ptr ctx = ExecutorContext::get(); + REQUIRE(ctx->getExecutor() == exec.get()); + REQUIRE(ctx->getBatchRequest() == req); + REQUIRE(ctx->getMsgIdx() == 3); + REQUIRE(ctx->getMsg().id() == req->mutable_messages()->at(3).id()); + } + + SECTION("Just set executor") + { + ExecutorContext::set(exec.get(), nullptr, 0); + + std::shared_ptr ctx = ExecutorContext::get(); + REQUIRE(ctx->getExecutor() == exec.get()); + REQUIRE(ctx->getBatchRequest() == nullptr); + REQUIRE(ctx->getMsgIdx() == 0); + + REQUIRE_THROWS(ctx->getMsg()); + } + + SECTION("Just set request") + { + ExecutorContext::set(nullptr, req, 3); + + std::shared_ptr ctx = ExecutorContext::get(); + REQUIRE(ctx->getExecutor() == nullptr); + REQUIRE(ctx->getBatchRequest() == req); + REQUIRE(ctx->getMsgIdx() == 3); + REQUIRE(ctx->getMsg().id() == req->mutable_messages()->at(3).id()); + } + + ExecutorContext::unset(); + REQUIRE_THROWS(ExecutorContext::get()); +} +} diff --git a/tests/utils/DummyExecutorFactory.h b/tests/utils/DummyExecutorFactory.h index afbd05ea6..10d95086c 100644 --- a/tests/utils/DummyExecutorFactory.h +++ b/tests/utils/DummyExecutorFactory.h @@ -11,9 +11,9 @@ class DummyExecutorFactory : public ExecutorFactory int getFlushCount(); - protected: std::shared_ptr createExecutor(faabric::Message& msg) override; + protected: void flushHost() override; private: diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index dec642afa..2ce347889 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -2,7 +2,9 @@ #include +#include #include +#include #include #include #include @@ -14,13 +16,14 @@ #include #include #include +#include #include #include #include +#include #include #include "DummyExecutorFactory.h" -#include "faabric/util/scheduling.h" namespace tests { class RedisTestFixture @@ -336,6 +339,40 @@ class PointToPointClientServerFixture faabric::transport::PointToPointServer server; }; +class ExecutorContextTestFixture +{ + public: + ExecutorContextTestFixture() {} + + ~ExecutorContextTestFixture() + { + faabric::scheduler::ExecutorContext::unset(); + } + + /** + * Creates a batch request and sets up the associated context + */ + std::shared_ptr setUpContext( + const std::string& user, + const std::string& func, + int nMsgs = 1) + { + auto req = faabric::util::batchExecFactory(user, func, nMsgs); + + setUpContext(req); + + return req; + } + + /** + * Sets up context for the given batch request + */ + void setUpContext(std::shared_ptr req) + { + faabric::scheduler::ExecutorContext::set(nullptr, req, 0); + } +}; + #define TEST_EXECUTOR_DEFAULT_MEMORY_SIZE (10 * faabric::util::HOST_PAGE_SIZE) class TestExecutor final : public faabric::scheduler::Executor