Skip to content

Commit

Permalink
Merge regions overhaul (#201)
Browse files Browse the repository at this point in the history
* Client/ server for pushing snapshot merge regions

* Reinstate clearing of merge regions

* Dist tests and test

* Add reserves where possible

* Support more merge ops

* Tests for more datatypes

* Added long type

* Add trace logging to difsf

* Remove register snapshot if exists

* Formatting

* More snapshot logging

* Add filling in gaps capacity

* Default to shared

* Use spans where possible

* Logging and reinstate MR clearing

* Add ignore diffs

* Fix bug in overflowing original snap data

* Remove unnecessary merge regions in dist tests

* More snapshot tests

* Fix uint mismatch

* Self review

* Profiling

* Use spans

* Use static initialised for resetting dirty pages
  • Loading branch information
Shillaker committed Dec 29, 2021
1 parent b6b3e3d commit 0e7a32c
Show file tree
Hide file tree
Showing 25 changed files with 1,325 additions and 540 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ project(faabric)

option(FAABRIC_WASM_BUILD "Build Faabric wasm library" OFF)
option(FAABRIC_BUILD_TESTS "Build Faabric tests" ON)
option(FAABRIC_SELF_TRACING "Turn on system tracing using the logger" OFF)

# Enable colorized compiler output
if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
Expand All @@ -22,6 +23,11 @@ set(CMAKE_EXE_LINKER_FLAGS "-fuse-ld=lld")
# Compile comamnds for clang tools
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

if(${FAABRIC_SELF_TRACING})
message("-- Activated Faabric self-tracing")
add_definitions(-DTRACE_ALL=1)
endif()

# Set-up use of sanitisers
if (FAABRIC_USE_SANITISER STREQUAL "Address")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
Expand Down
11 changes: 10 additions & 1 deletion include/faabric/snapshot/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient
void pushSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void pushSnapshotUpdate(
std::string snapshotKey,
const std::shared_ptr<faabric::util::SnapshotData>& data,
const std::vector<faabric::util::SnapshotDiff>& diffs);

void pushSnapshotDiffs(
std::string snapshotKey,
bool force,
const std::vector<faabric::util::SnapshotDiff>& diffs);

void deleteSnapshot(const std::string& key);
Expand All @@ -49,5 +53,10 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient

private:
void sendHeader(faabric::snapshot::SnapshotCalls call);

void doPushSnapshotDiffs(
const std::string& snapshotKey,
const std::shared_ptr<faabric::util::SnapshotData>& data,
const std::vector<faabric::util::SnapshotDiff>& diffs);
};
}
10 changes: 0 additions & 10 deletions include/faabric/snapshot/SnapshotRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,9 @@ class SnapshotRegistry

bool snapshotExists(const std::string& key);

void mapSnapshot(const std::string& key, uint8_t* target);

void registerSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void registerSnapshotIfNotExists(
const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void deleteSnapshot(const std::string& key);

size_t getSnapshotCount();
Expand All @@ -44,10 +38,6 @@ class SnapshotRegistry

int writeSnapshotToFd(const std::string& key,
faabric::util::SnapshotData& data);

void doRegisterSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data,
bool overwrite);
};

SnapshotRegistry& getSnapshotRegistry();
Expand Down
2 changes: 2 additions & 0 deletions include/faabric/util/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ std::vector<std::pair<uint32_t, uint32_t>> getDirtyRegions(const uint8_t* ptr,
// -------------------------
typedef std::unique_ptr<uint8_t[], std::function<void(uint8_t*)>> MemoryRegion;

MemoryRegion allocatePrivateMemory(size_t size);

MemoryRegion allocateSharedMemory(size_t size);

MemoryRegion allocateVirtualMemory(size_t size);
Expand Down
115 changes: 107 additions & 8 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ namespace faabric::util {
enum SnapshotDataType
{
Raw,
Int
Bool,
Int,
Long,
Float,
Double
};

enum SnapshotMergeOperation
Expand All @@ -27,7 +31,8 @@ enum SnapshotMergeOperation
Product,
Subtract,
Max,
Min
Min,
Ignore
};

class SnapshotDiff
Expand Down Expand Up @@ -65,14 +70,106 @@ class SnapshotMergeRegion
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;

SnapshotMergeRegion() = default;

SnapshotMergeRegion(uint32_t offsetIn,
size_t lengthIn,
SnapshotDataType dataTypeIn,
SnapshotMergeOperation operationIn);

void addDiffs(std::vector<SnapshotDiff>& diffs,
const uint8_t* original,
uint32_t originalSize,
const uint8_t* updated,
uint32_t dirtyRegionStart,
uint32_t dirtyRegionEnd);
std::span<const uint8_t> originalData,
std::span<const uint8_t> updatedData,
std::pair<uint32_t, uint32_t> dirtyRange);

private:
void addOverwriteDiff(std::vector<SnapshotDiff>& diffs,
std::span<const uint8_t> original,
std::span<const uint8_t> updated,
std::pair<uint32_t, uint32_t> dirtyRange);
};

template<typename T>
inline bool calculateDiffValue(const uint8_t* original,
uint8_t* updated,
SnapshotMergeOperation operation)
{
// Cast to value
T updatedValue = unalignedRead<T>(updated);
T originalValue = unalignedRead<T>(original);

// Skip if no change
if (originalValue == updatedValue) {
return false;
}

// Work out final result
switch (operation) {
case (SnapshotMergeOperation::Sum): {
// Sums must send the value to be _added_, and
// not the final result
updatedValue -= originalValue;
break;
}
case (SnapshotMergeOperation::Subtract): {
// Subtractions must send the value to be
// subtracted, not the result
updatedValue = originalValue - updatedValue;
break;
}
case (SnapshotMergeOperation::Product): {
// Products must send the value to be
// multiplied, not the result
updatedValue /= originalValue;
break;
}
case (SnapshotMergeOperation::Max):
case (SnapshotMergeOperation::Min):
// Min and max don't need to change
break;
default: {
SPDLOG_ERROR("Can't calculate diff for operation: {}", operation);
throw std::runtime_error("Can't calculate diff");
}
}

unalignedWrite<T>(updatedValue, updated);

return true;
}

template<typename T>
inline T applyDiffValue(const uint8_t* original,
const uint8_t* diff,
SnapshotMergeOperation operation)
{

auto diffValue = unalignedRead<T>(diff);
T originalValue = unalignedRead<T>(original);

switch (operation) {
case (SnapshotMergeOperation::Sum): {
return diffValue + originalValue;
}
case (SnapshotMergeOperation::Subtract): {
return originalValue - diffValue;
}
case (SnapshotMergeOperation::Product): {
return originalValue * diffValue;
}
case (SnapshotMergeOperation::Max): {
return std::max<T>(originalValue, diffValue);
}
case (SnapshotMergeOperation::Min): {
return std::min<T>(originalValue, diffValue);
}
default: {
SPDLOG_ERROR("Can't apply merge operation: {}", operation);
throw std::runtime_error("Can't apply merge operation");
}
}
}

class SnapshotData
{
public:
Expand Down Expand Up @@ -100,14 +197,16 @@ class SnapshotData

std::vector<uint8_t> getDataCopy(uint32_t offset, size_t dataSize);

void mapToMemory(uint8_t* target);
void mapToMemory(std::span<uint8_t> target);

void addMergeRegion(uint32_t offset,
size_t length,
SnapshotDataType dataType,
SnapshotMergeOperation operation,
bool overwrite = false);

void fillGapsWithOverwriteRegions();

void clearMergeRegions();

std::map<uint32_t, SnapshotMergeRegion> getMergeRegions();
Expand Down
19 changes: 14 additions & 5 deletions src/flat/faabric.fbs
Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
table SnapshotMergeRegionRequest {
offset:int;
length:ulong;
data_type:int;
merge_op:int;
}

table SnapshotPushRequest {
key:string;
maxSize:ulong;
max_size:ulong;
contents:[ubyte];
merge_regions:[SnapshotMergeRegionRequest];
}

table SnapshotDeleteRequest {
key:string;
}

table SnapshotDiffChunk {
table SnapshotDiffRequest {
offset:int;
dataType:int;
mergeOp:int;
data_type:int;
merge_op:int;
data:[ubyte];
}

table SnapshotDiffPushRequest {
key:string;
force:bool;
chunks:[SnapshotDiffChunk];
merge_regions:[SnapshotMergeRegionRequest];
diffs:[SnapshotDiffRequest];
}

table ThreadResultRequest {
Expand Down
4 changes: 4 additions & 0 deletions src/runner/FaabricMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ void FaabricMain::startBackground()
// Crash handler
faabric::util::setUpCrashHandler();

PROF_BEGIN

// Start basics
startRunner();

Expand All @@ -39,6 +41,8 @@ void FaabricMain::startBackground()

// Work sharing
startFunctionCallServer();

PROF_SUMMARY
}

void FaabricMain::startRunner()
Expand Down
15 changes: 10 additions & 5 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,15 @@ void Executor::threadPoolThread(int threadPoolIdx)
SPDLOG_TRACE("Diffing memory with pre-execution snapshot for {}",
msg.snapshotkey());

// If we're on master, we write the diffs straight to the snapshot
// otherwise we push them to the master.
// Fill gaps with overwrites
snap->fillGapsWithOverwriteRegions();

// Work out the diffs
std::vector<faabric::util::SnapshotDiff> diffs =
funcMemory.diffWithSnapshot(snap);

// On master we queue the diffs locally directly, on a remote host
// we push them back to master
if (isMaster) {
SPDLOG_DEBUG("Queueing {} diffs for {} to snapshot {} on "
"master (group {})",
Expand All @@ -314,11 +318,12 @@ void Executor::threadPoolThread(int threadPoolIdx)
snap->queueDiffs(diffs);
} else {
sch.pushSnapshotDiffs(msg, diffs);

// Reset dirty page tracking on non-master
faabric::util::resetDirtyTracking();
}

// Reset dirty page tracking
faabric::util::resetDirtyTracking();

// Clear merge regions
SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey());
snap->clearMergeRegions();
}
Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
std::vector<faabric::util::SnapshotDiff> snapshotDiffs =
snapMemView.getDirtyRegions();

c.pushSnapshotDiffs(snapshotKey, true, snapshotDiffs);
c.pushSnapshotUpdate(snapshotKey, snap, snapshotDiffs);
} else {
c.pushSnapshot(snapshotKey, snap);
pushedSnapshotsMap[snapshotKey].insert(host);
Expand Down Expand Up @@ -916,7 +916,7 @@ void Scheduler::pushSnapshotDiffs(
}

SnapshotClient& c = getSnapshotClient(msg.masterhost());
c.pushSnapshotDiffs(snapKey, false, diffs);
c.pushSnapshotDiffs(snapKey, diffs);
}

void Scheduler::setThreadResultLocally(uint32_t msgId, int32_t returnValue)
Expand Down
Loading

0 comments on commit 0e7a32c

Please sign in to comment.