From 3f694f8c26af7d4b57c6d4d8d602b68caf2ba507 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 23 Nov 2021 16:04:48 +0000 Subject: [PATCH] fix race condition in scheduler reset and add test --- src/scheduler/Executor.cpp | 9 +++++---- tests/test/scheduler/test_executor.cpp | 28 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index c084047c5..54443a0c1 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -61,15 +61,16 @@ void Executor::finish() // Shut down thread pools and wait for (int i = 0; i < threadPoolThreads.size(); i++) { - if (threadPoolThreads.at(i) == nullptr) { - continue; - } - // Send a kill message SPDLOG_TRACE("Executor {} killing thread pool {}", id, i); threadTaskQueues[i].enqueue( ExecutorTask(POOL_SHUTDOWN, nullptr, nullptr, false, false)); + // If already killed, move to the next thread + if (threadPoolThreads.at(i) == nullptr) { + continue; + } + // Await the thread if (threadPoolThreads.at(i)->joinable()) { threadPoolThreads.at(i)->join(); diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 573a08522..dc792d714 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -330,6 +330,34 @@ TEST_CASE_METHOD(TestExecutorFixture, REQUIRE(restoreCount == 0); } +TEST_CASE_METHOD(TestExecutorFixture, + "Test executing function repeatedly and flushing", + "[executor]") +{ + std::shared_ptr req = + faabric::util::batchExecFactory("dummy", "simple", 1); + uint32_t msgId = req->messages().at(0).id(); + std::vector actualHosts; + + int numRepeats = 20; + for (int i = 0; i < numRepeats; i++) { + std::vector actualHosts = + executeWithTestExecutor(req, false); + faabric::Message result = + sch.getFunctionResult(msgId, SHORT_TEST_TIMEOUT_MS); + std::string expected = + fmt::format("Simple function {} executed", msgId); + REQUIRE(result.outputdata() == expected); + + // We sleep for the same timeout threads have to force a race condition + // between the scheduler's flush and the thread's own timeour cleanup + SLEEP_MS(conf.boundTimeout); + + // Flush + sch.flushLocally(); + } +} + TEST_CASE_METHOD(TestExecutorFixture, "Test executing chained functions", "[executor]")