From 4930c61bc537e3c238d6c6428a74d9f626bbd639 Mon Sep 17 00:00:00 2001 From: Carlos Date: Wed, 24 Nov 2021 15:32:59 +0100 Subject: [PATCH] Fix race condition in scheduler reset (#179) * fix race condition in scheduler reset and add test * pr comments --- src/scheduler/Executor.cpp | 9 ++++---- tests/test/scheduler/test_executor.cpp | 29 ++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index c084047c5..54443a0c1 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -61,15 +61,16 @@ void Executor::finish() // Shut down thread pools and wait for (int i = 0; i < threadPoolThreads.size(); i++) { - if (threadPoolThreads.at(i) == nullptr) { - continue; - } - // Send a kill message SPDLOG_TRACE("Executor {} killing thread pool {}", id, i); threadTaskQueues[i].enqueue( ExecutorTask(POOL_SHUTDOWN, nullptr, nullptr, false, false)); + // If already killed, move to the next thread + if (threadPoolThreads.at(i) == nullptr) { + continue; + } + // Await the thread if (threadPoolThreads.at(i)->joinable()) { threadPoolThreads.at(i)->join(); diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 573a08522..89aa2c753 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -330,6 +330,35 @@ TEST_CASE_METHOD(TestExecutorFixture, REQUIRE(restoreCount == 0); } +TEST_CASE_METHOD(TestExecutorFixture, + "Test executing function repeatedly and flushing", + "[executor]") +{ + // Set the bound timeout to something short so the test runs fast + conf.boundTimeout = 100; + + int numRepeats = 20; + for (int i = 0; i < numRepeats; i++) { + std::shared_ptr req = + faabric::util::batchExecFactory("dummy", "simple", 1); + uint32_t msgId = req->messages().at(0).id(); + + executeWithTestExecutor(req, false); + faabric::Message result = + sch.getFunctionResult(msgId, SHORT_TEST_TIMEOUT_MS); + std::string expected = + fmt::format("Simple function {} executed", msgId); + REQUIRE(result.outputdata() == expected); + + // We sleep for the same timeout threads have, to force a race condition + // between the scheduler's flush and the thread's own cleanup timeout + SLEEP_MS(conf.boundTimeout); + + // Flush + sch.flushLocally(); + } +} + TEST_CASE_METHOD(TestExecutorFixture, "Test executing chained functions", "[executor]")