Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions around executor reuse #120

Merged
merged 5 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,19 @@ void Executor::threadPoolThread(int threadPoolIdx)
sch.setFunctionResult(msg);
}

// Reset and release claim. Note that we have to release the claim
// _after_ resetting, once the executor is ready to be reused
// If this batch is finished, reset and release the claim on this
// executor.
// Note that we have to release the claim _after_ resetting otherwise
// the executor won't be ready for reuse.
if (oldTaskCount == 1) {
reset(msg);
releaseClaim();
}

// Vacate the slot for this task in the scheduler. This must be done
// after releasing the claim and resetting the module to avoid a race
// condition in the scheduler that allows the host to overcommit.
sch.vacateSlot();
}

if (selfShutdown) {
Expand Down
6 changes: 0 additions & 6 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,9 +686,6 @@ void Scheduler::flushLocally()

void Scheduler::setFunctionResult(faabric::Message& msg)
{
// Vacate the slot taken by this function
vacateSlot();

redis::Redis& redis = redis::Redis::getQueue();

// Record which host did the execution
Expand Down Expand Up @@ -733,9 +730,6 @@ void Scheduler::setThreadResult(
int32_t returnValue,
const std::vector<faabric::util::SnapshotDiff>& diffs)
{
// Vacate the slot taken by this thread
vacateSlot();

bool isMaster = msg.masterhost() == conf.endpointHost;

if (isMaster) {
Expand Down
5 changes: 0 additions & 5 deletions tests/dist/fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,11 @@ class DistTestsFixture
, public ConfTestFixture
, public SnapshotTestFixture
{
protected:
std::set<std::string> otherHosts;
Copy link
Collaborator Author

@Shillaker Shillaker Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated, but I noticed otherHosts isn't actually used anywhere so I removed it as part of this PR. Happy to move elsewhere but it's just 5 lines in this one file.


public:
DistTestsFixture()
{
// Get other hosts
std::string thisHost = conf.endpointHost;
otherHosts = sch.getAvailableHosts();
otherHosts.erase(thisHost);

// Set up executor
std::shared_ptr<tests::DistTestExecutorFactory> fac =
Expand Down
47 changes: 47 additions & 0 deletions tests/test/scheduler/test_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ class SlowExecutorFixture
};
};

class DummyExecutorFixture
: public RedisTestFixture
, public SchedulerTestFixture
, public ConfTestFixture
{
public:
DummyExecutorFixture()
{
std::shared_ptr<ExecutorFactory> fac =
std::make_shared<DummyExecutorFactory>();
setExecutorFactory(fac);
};

~DummyExecutorFixture()
{
std::shared_ptr<DummyExecutorFactory> fac =
std::make_shared<DummyExecutorFactory>();
setExecutorFactory(fac);
};
};

TEST_CASE_METHOD(SlowExecutorFixture, "Test scheduler clear-up", "[scheduler]")
{
faabric::util::setMockMode(true);
Expand Down Expand Up @@ -783,4 +804,30 @@ TEST_CASE_METHOD(SlowExecutorFixture,
std::get<3>(actualTuple);
REQUIRE(actualDiffs.size() == diffs.size());
}

TEST_CASE_METHOD(DummyExecutorFixture, "Test executor reuse", "[scheduler]")
{
faabric::Message msgA = faabric::util::messageFactory("foo", "bar");
faabric::Message msgB = faabric::util::messageFactory("foo", "bar");
faabric::Message msgC = faabric::util::messageFactory("foo", "bar");
faabric::Message msgD = faabric::util::messageFactory("foo", "bar");

// Execute a couple of functions
sch.callFunction(msgA);
sch.callFunction(msgB);
sch.getFunctionResult(msgA.id(), SHORT_TEST_TIMEOUT_MS);
sch.getFunctionResult(msgB.id(), SHORT_TEST_TIMEOUT_MS);

// Check executor count
REQUIRE(sch.getFunctionExecutorCount(msgA) == 2);

// Submit a couple more functions
sch.callFunction(msgC);
sch.callFunction(msgD);
sch.getFunctionResult(msgC.id(), SHORT_TEST_TIMEOUT_MS);
sch.getFunctionResult(msgD.id(), SHORT_TEST_TIMEOUT_MS);

// Check executor count is still the same
REQUIRE(sch.getFunctionExecutorCount(msgA) == 2);
}
}