Skip to content

Commit

Permalink
Scheduler hints and fixes for distributed coordination bugs (#169)
Browse files Browse the repository at this point in the history
* Started fixes for locks

* Clearing up groups

* Experiments with waiters/ maps

* Add test for flag waiter

* Avoid scheduling multiple threads on same thread as 0th group

* Overloading thread pool

* Moved unrelated changes to separate PR

* Add locks.cpp

* Only lock on group when it exists

* Add custom merges

* Overhaul of snapshot diffing

* Move spurious logging statement

* More snapshotting fixes

* Rearrange scheduler logic

* Fix indexing in scheduler loop

* Avoid pushing snapshots to master host

* Remove last snapshot stuff

* Add scheduler hints

* Fix bug in host ordering

* Sort out reusing ptp messages

* Naming

* Fixing up tests

* Continuing test fixes

* Remove ignore regions

* Add distributed locking test

* Formatting

* Fix dist tests and remove unnecessary unit test

* Clearer error message when insufficient pool threads

* Bump cores in failing tests

* Factor out scheduler decision making

* Fix locking bug
  • Loading branch information
Shillaker committed Nov 10, 2021
1 parent f7a910b commit 990c640
Show file tree
Hide file tree
Showing 25 changed files with 1,553 additions and 1,076 deletions.
25 changes: 16 additions & 9 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,12 @@ class Executor
uint32_t threadPoolSize = 0;

private:
std::string lastSnapshot;

std::atomic<bool> claimed = false;

std::mutex threadsMutex;
std::vector<std::shared_ptr<std::thread>> threadPoolThreads;
std::vector<std::shared_ptr<std::thread>> deadThreads;
std::set<int> availablePoolThreads;

std::vector<faabric::util::Queue<ExecutorTask>> threadTaskQueues;

Expand All @@ -105,6 +104,10 @@ class Scheduler
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal = false);

faabric::util::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& hint);

void reset();

void resetThreadLocalCache();
Expand Down Expand Up @@ -204,6 +207,8 @@ class Scheduler
std::promise<std::unique_ptr<faabric::Message>>>
localResults;

std::unordered_map<std::string, std::set<std::string>> pushedSnapshotsMap;

std::mutex localResultsMutex;

// ---- Clients ----
Expand All @@ -226,20 +231,22 @@ class Scheduler

std::unordered_map<std::string, std::set<std::string>> registeredHosts;

faabric::util::SchedulingDecision makeSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal);

faabric::util::SchedulingDecision doCallFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision,
faabric::util::FullLock& lock);

std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
faabric::util::FullLock& schedulerLock);

std::vector<std::string> getUnregisteredHosts(const std::string& funcStr,
bool noCache = false);

int scheduleFunctionsOnHost(
const std::string& host,
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision,
int offset,
faabric::util::SnapshotData* snapshot);

// ---- Accounting and debugging ----
std::vector<faabric::Message> recordedMessagesAll;
std::vector<faabric::Message> recordedMessagesLocal;
Expand Down
21 changes: 13 additions & 8 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <faabric/transport/PointToPointClient.h>
#include <faabric/util/config.h>
#include <faabric/util/locks.h>
#include <faabric/util/scheduling.h>

#include <condition_variable>
Expand All @@ -26,10 +27,16 @@ class PointToPointGroup
public:
static std::shared_ptr<PointToPointGroup> getGroup(int groupId);

static std::shared_ptr<PointToPointGroup> getOrAwaitGroup(int groupId);

static bool groupExists(int groupId);

static void addGroup(int appId, int groupId, int groupSize);

static void addGroupIfNotExists(int appId, int groupId, int groupSize);

static void clearGroup(int groupId);

static void clear();

PointToPointGroup(int appId, int groupIdIn, int groupSizeIn);
Expand Down Expand Up @@ -77,10 +84,6 @@ class PointToPointGroup
std::queue<int> lockWaiters;

void notifyLocked(int groupIdx);

void masterLock(int groupIdx, bool recursive);

void masterUnlock(int groupIdx, bool recursive);
};

class PointToPointBroker
Expand Down Expand Up @@ -108,21 +111,23 @@ class PointToPointBroker

std::vector<uint8_t> recvMessage(int groupId, int sendIdx, int recvIdx);

void clearGroup(int groupId);

void clear();

void resetThreadLocalCache();

private:
faabric::util::SystemConfig& conf;

std::shared_mutex brokerMutex;

std::unordered_map<int, std::set<int>> groupIdIdxsMap;
std::unordered_map<std::string, std::string> mappings;

std::unordered_map<int, bool> groupMappingsFlags;
std::unordered_map<int, std::mutex> groupMappingMutexes;
std::unordered_map<int, std::condition_variable> groupMappingCvs;
std::unordered_map<int, faabric::util::FlagWaiter> groupFlags;

faabric::util::SystemConfig& conf;
faabric::util::FlagWaiter& getGroupFlag(int groupId);
};

PointToPointBroker& getPointToPointBroker();
Expand Down
2 changes: 1 addition & 1 deletion include/faabric/transport/PointToPointServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class PointToPointServer final : public MessageEndpointServer
PointToPointServer();

private:
PointToPointBroker& reg;
PointToPointBroker& broker;

void doAsyncRecv(int header,
const uint8_t* buffer,
Expand Down
23 changes: 23 additions & 0 deletions include/faabric/util/locks.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
#pragma once

#include <faabric/util/logging.h>

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <shared_mutex>

#define DEFAULT_FLAG_WAIT_MS 10000

namespace faabric::util {
typedef std::unique_lock<std::mutex> UniqueLock;
typedef std::unique_lock<std::shared_mutex> FullLock;
typedef std::shared_lock<std::shared_mutex> SharedLock;

class FlagWaiter
{
public:
FlagWaiter(int timeoutMsIn = DEFAULT_FLAG_WAIT_MS);

void waitOnFlag();

void setFlag(bool value);

private:
int timeoutMs;

std::mutex flagMx;
std::condition_variable cv;
std::atomic<bool> flag;
};
}
37 changes: 19 additions & 18 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <vector>

#include <faabric/util/logging.h>
#include <faabric/util/macros.h>

namespace faabric::util {

Expand All @@ -19,22 +20,13 @@ enum SnapshotDataType
enum SnapshotMergeOperation
{
Overwrite,
Ignore,
Sum,
Product,
Subtract,
Max,
Min
};

struct SnapshotMergeRegion
{
uint32_t offset = 0;
size_t length = 0;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
};

class SnapshotDiff
{
public:
Expand All @@ -44,6 +36,8 @@ class SnapshotDiff
size_t size = 0;
const uint8_t* data = nullptr;

bool noChange = false;

SnapshotDiff() = default;

SnapshotDiff(SnapshotDataType dataTypeIn,
Expand All @@ -58,13 +52,19 @@ class SnapshotDiff
data = dataIn;
size = sizeIn;
}
};

SnapshotDiff(uint32_t offsetIn, const uint8_t* dataIn, size_t sizeIn)
{
offset = offsetIn;
data = dataIn;
size = sizeIn;
}
class SnapshotMergeRegion
{
public:
uint32_t offset = 0;
size_t length = 0;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;

void addDiffs(std::vector<SnapshotDiff>& diffs,
const uint8_t* original,
const uint8_t* updated);
};

class SnapshotData
Expand All @@ -84,11 +84,12 @@ class SnapshotData
void addMergeRegion(uint32_t offset,
size_t length,
SnapshotDataType dataType,
SnapshotMergeOperation operation);
SnapshotMergeOperation operation,
bool overwrite = false);

private:
// Note - we care about the order of this map, as we iterate through it in
// order of offsets
// Note - we care about the order of this map, as we iterate through it
// in order of offsets
std::map<uint32_t, SnapshotMergeRegion> mergeRegions;
};

Expand Down
Loading

0 comments on commit 990c640

Please sign in to comment.