Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge regions overhaul #201

Merged
merged 25 commits into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion include/faabric/snapshot/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient
void pushSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void pushSnapshotUpdate(
std::string snapshotKey,
std::shared_ptr<faabric::util::SnapshotData> data,
const std::vector<faabric::util::SnapshotDiff>& diffs);

void pushSnapshotDiffs(
std::string snapshotKey,
bool force,
const std::vector<faabric::util::SnapshotDiff>& diffs);

void deleteSnapshot(const std::string& key);
Expand All @@ -49,5 +53,10 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient

private:
void sendHeader(faabric::snapshot::SnapshotCalls call);

void doPushSnapshotDiffs(
const std::string& snapshotKey,
std::shared_ptr<faabric::util::SnapshotData> data,
const std::vector<faabric::util::SnapshotDiff>& diffs);
};
}
19 changes: 14 additions & 5 deletions src/flat/faabric.fbs
Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
table SnapshotMergeRegionRequest {
offset:int;
length:ulong;
data_type:int;
merge_op:int;
}

table SnapshotPushRequest {
key:string;
maxSize:ulong;
max_size:ulong;
contents:[ubyte];
merge_regions:[SnapshotMergeRegionRequest];
}

table SnapshotDeleteRequest {
key:string;
}

table SnapshotDiffChunk {
table SnapshotDiffRequest {
offset:int;
dataType:int;
mergeOp:int;
data_type:int;
merge_op:int;
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
data:[ubyte];
}

table SnapshotDiffPushRequest {
key:string;
force:bool;
chunks:[SnapshotDiffChunk];
merge_regions:[SnapshotMergeRegionRequest];
diffs:[SnapshotDiffRequest];
}

table ThreadResultRequest {
Expand Down
4 changes: 1 addition & 3 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,12 @@ void Executor::threadPoolThread(int threadPoolIdx)

snap->queueDiffs(diffs);
} else {
// Push diffs back to master
sch.pushSnapshotDiffs(msg, diffs);

// Reset dirty page tracking on non-master
faabric::util::resetDirtyTracking();
}

SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey());
snap->clearMergeRegions();
}

// If this batch is finished, reset the executor and release its claim.
Expand Down
5 changes: 3 additions & 2 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
std::vector<faabric::util::SnapshotDiff> snapshotDiffs =
snapMemView.getDirtyRegions();

c.pushSnapshotDiffs(snapshotKey, true, snapshotDiffs);
c.pushSnapshotUpdate(
snapshotKey, std::move(snap), snapshotDiffs);
} else {
c.pushSnapshot(snapshotKey, snap);
pushedSnapshotsMap[snapshotKey].insert(host);
Expand Down Expand Up @@ -916,7 +917,7 @@ void Scheduler::pushSnapshotDiffs(
}

SnapshotClient& c = getSnapshotClient(msg.masterhost());
c.pushSnapshotDiffs(snapKey, false, diffs);
c.pushSnapshotDiffs(snapKey, diffs);
}

void Scheduler::setThreadResultLocally(uint32_t msgId, int32_t returnValue)
Expand Down
84 changes: 70 additions & 14 deletions src/snapshot/SnapshotClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,51 +93,107 @@ void SnapshotClient::pushSnapshot(
// Set up the main request
// TODO - avoid copying data here?
flatbuffers::FlatBufferBuilder mb;

std::vector<flatbuffers::Offset<SnapshotMergeRegionRequest>>
mrsFbVector;
for (const auto& m : data->getMergeRegions()) {
auto mr = CreateSnapshotMergeRegionRequest(mb,
m.second.offset,
m.second.length,
m.second.dataType,
m.second.operation);
mrsFbVector.push_back(mr);
}

auto keyOffset = mb.CreateString(key);
auto dataOffset =
mb.CreateVector<uint8_t>(data->getDataPtr(), data->getSize());
auto mrsOffset = mb.CreateVector(mrsFbVector);
auto requestOffset = CreateSnapshotPushRequest(
mb, keyOffset, data->getMaxSize(), dataOffset);
mb, keyOffset, data->getMaxSize(), dataOffset, mrsOffset);
mb.Finish(requestOffset);

// Send it
SEND_FB_MSG(SnapshotCalls::PushSnapshot, mb)
}
}

void SnapshotClient::pushSnapshotUpdate(
std::string snapshotKey,
std::shared_ptr<faabric::util::SnapshotData> data,
const std::vector<faabric::util::SnapshotDiff>& diffs)
{
SPDLOG_DEBUG("Pushing update to snapshot {} to {} ({} diffs, {} regions)",
snapshotKey,
host,
diffs.size(),
data->getMergeRegions().size());

doPushSnapshotDiffs(snapshotKey, data, diffs);
}

void SnapshotClient::pushSnapshotDiffs(
std::string snapshotKey,
bool force,
const std::vector<faabric::util::SnapshotDiff>& diffs)
{
SPDLOG_DEBUG("Pushing {} diffs for snapshot {} to {}",
diffs.size(),
snapshotKey,
host);

doPushSnapshotDiffs(snapshotKey, nullptr, diffs);
}

void SnapshotClient::doPushSnapshotDiffs(
const std::string& snapshotKey,
std::shared_ptr<faabric::util::SnapshotData> data,
const std::vector<faabric::util::SnapshotDiff>& diffs)
{
if (faabric::util::isMockMode()) {
faabric::util::UniqueLock lock(mockMutex);
snapshotDiffPushes.emplace_back(host, diffs);
} else {
SPDLOG_DEBUG("Pushing {} diffs for snapshot {} to {}",
diffs.size(),
snapshotKey,
host);

flatbuffers::FlatBufferBuilder mb;

// Create objects for all the chunks
std::vector<flatbuffers::Offset<SnapshotDiffChunk>> diffsFbVector;
// Create objects for all the diffs
std::vector<flatbuffers::Offset<SnapshotDiffRequest>> diffsFbVector;
for (const auto& d : diffs) {
std::span<const uint8_t> diffData = d.getData();
auto dataOffset =
mb.CreateVector<uint8_t>(diffData.data(), diffData.size());

auto chunk = CreateSnapshotDiffChunk(
auto diff = CreateSnapshotDiffRequest(
mb, d.getOffset(), d.getDataType(), d.getOperation(), dataOffset);
diffsFbVector.push_back(chunk);
diffsFbVector.push_back(diff);
}

// If we have snapshot data, we need to include the merge regions and
// force too.
std::vector<flatbuffers::Offset<SnapshotMergeRegionRequest>>
mrsFbVector;
bool force = false;
if (data != nullptr) {
for (const auto& m : data->getMergeRegions()) {
auto mr = CreateSnapshotMergeRegionRequest(mb,
m.second.offset,
m.second.length,
m.second.dataType,
m.second.operation);
mrsFbVector.push_back(mr);
}

force = true;
} else {
force = false;
}

// Set up the request
auto keyOffset = mb.CreateString(snapshotKey);
auto diffsOffset = mb.CreateVector(diffsFbVector);
auto requestOffset =
CreateSnapshotDiffPushRequest(mb, keyOffset, force, diffsOffset);
auto mrsOffset = mb.CreateVector(mrsFbVector);

auto requestOffset = CreateSnapshotDiffPushRequest(
mb, keyOffset, force, mrsOffset, diffsOffset);

mb.Finish(requestOffset);

SEND_FB_MSG(SnapshotCalls::PushSnapshotDiffs, mb);
Expand Down
44 changes: 30 additions & 14 deletions src/snapshot/SnapshotServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,28 @@ std::unique_ptr<google::protobuf::Message> SnapshotServer::recvPushSnapshot(
SPDLOG_DEBUG("Receiving snapshot {} (size {}, max {})",
r->key()->c_str(),
r->contents()->size(),
r->maxSize());
r->max_size());

faabric::snapshot::SnapshotRegistry& reg =
faabric::snapshot::getSnapshotRegistry();

// Set up the snapshot
size_t snapSize = r->contents()->size();
std::string snapKey = r->key()->str();
auto d = std::make_shared<SnapshotData>(
std::span((uint8_t*)r->contents()->Data(), snapSize), r->maxSize());
auto snap = std::make_shared<SnapshotData>(
std::span((uint8_t*)r->contents()->Data(), snapSize), r->max_size());

// Add the merge regions
for (const auto* mr : *r->merge_regions()) {
snap->addMergeRegion(
mr->offset(),
mr->length(),
static_cast<SnapshotDataType>(mr->data_type()),
static_cast<SnapshotMergeOperation>(mr->merge_op()));
}

// Register snapshot
reg.registerSnapshot(snapKey, d);
reg.registerSnapshot(snapKey, snap);

// Send response
return std::make_unique<faabric::EmptyResponse>();
Expand All @@ -114,31 +123,38 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize)
flatbuffers::GetRoot<SnapshotDiffPushRequest>(buffer);

SPDLOG_DEBUG(
"Applying {} diffs to snapshot {}", r->chunks()->size(), r->key()->str());
"Applying {} diffs to snapshot {}", r->diffs()->size(), r->key()->str());

// Get the snapshot
faabric::snapshot::SnapshotRegistry& reg =
faabric::snapshot::getSnapshotRegistry();
auto snap = reg.getSnapshot(r->key()->str());

// Convert chunks to snapshot diff objects
// Convert diffs to snapshot diff objects
std::vector<SnapshotDiff> diffs;
diffs.reserve(r->chunks()->size());
for (const auto* chunk : *r->chunks()) {
diffs.reserve(r->diffs()->size());
for (const auto* diff : *r->diffs()) {
diffs.emplace_back(
static_cast<SnapshotDataType>(chunk->dataType()),
static_cast<SnapshotMergeOperation>(chunk->mergeOp()),
chunk->offset(),
std::span<const uint8_t>(chunk->data()->data(),
chunk->data()->size()));
static_cast<SnapshotDataType>(diff->data_type()),
static_cast<SnapshotMergeOperation>(diff->merge_op()),
diff->offset(),
std::span<const uint8_t>(diff->data()->data(), diff->data()->size()));
}

// Queue on the snapshot
snap->queueDiffs(diffs);

// Write if necessary
// Write diffs and set merge regions if necessary
if (r->force()) {
snap->writeQueuedDiffs();

for (const auto* mr : *r->merge_regions()) {
snap->addMergeRegion(
mr->offset(),
mr->length(),
static_cast<SnapshotDataType>(mr->data_type()),
static_cast<SnapshotMergeOperation>(mr->merge_op()));
}
}

// Send response
Expand Down
Loading