Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XOR diffing #224

Merged
merged 2 commits into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/faabric/util/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SystemConfig

// Dirty tracking
std::string dirtyTrackingMode;
std::string diffingMode;

SystemConfig();

Expand Down
11 changes: 7 additions & 4 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ enum SnapshotDataType

enum SnapshotMergeOperation
{
Overwrite,
Bytewise,
XOR,
Sum,
Product,
Subtract,
Expand Down Expand Up @@ -58,7 +59,7 @@ class SnapshotDiff

private:
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
SnapshotMergeOperation operation = SnapshotMergeOperation::Bytewise;
uint32_t offset = 0;
std::vector<uint8_t> data;
};
Expand All @@ -79,7 +80,7 @@ class SnapshotMergeRegion
uint32_t offset = 0;
size_t length = 0;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
SnapshotMergeOperation operation = SnapshotMergeOperation::Bytewise;

SnapshotMergeRegion() = default;

Expand Down Expand Up @@ -233,7 +234,7 @@ class SnapshotData
SnapshotDataType dataType,
SnapshotMergeOperation operation);

void fillGapsWithOverwriteRegions();
void fillGapsWithBytewiseRegions();

void clearMergeRegions();

Expand Down Expand Up @@ -285,6 +286,8 @@ class SnapshotData

void writeData(std::span<const uint8_t> buffer, uint32_t offset = 0);

void xorData(std::span<const uint8_t> buffer, uint32_t offset = 0);

void checkWriteExtension(std::span<const uint8_t> buffer, uint32_t offset);
};

Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ std::vector<std::pair<uint32_t, int32_t>> Executor::executeThreads(
std::vector<char> dirtyRegions = tracker.getBothDirtyPages(memView);

// Apply changes to snapshot
snap->fillGapsWithOverwriteRegions();
snap->fillGapsWithBytewiseRegions();
std::vector<faabric::util::SnapshotDiff> updates =
snap->diffWithDirtyRegions(memView, dirtyRegions);

Expand Down Expand Up @@ -622,7 +622,7 @@ void Executor::threadPoolThread(int threadPoolIdx)
std::string mainThreadSnapKey =
faabric::util::getMainThreadSnapshotKey(msg);
auto snap = reg.getSnapshot(mainThreadSnapKey);
snap->fillGapsWithOverwriteRegions();
snap->fillGapsWithBytewiseRegions();

// Compare snapshot with all dirty regions for this executor
std::vector<faabric::util::SnapshotDiff> diffs;
Expand Down
1 change: 1 addition & 0 deletions src/util/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ void SystemConfig::initialise()

// Dirty tracking
dirtyTrackingMode = getEnvVar("DIRTY_TRACKING_MODE", "segfault");
diffingMode = getEnvVar("DIFFING_MODE", "xor");
}

int SystemConfig::getSystemConfIntParam(const char* name,
Expand Down
104 changes: 78 additions & 26 deletions src/util/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void diffArrayRegions(std::vector<SnapshotDiff>& snapshotDiffs,
// Finished on byte before
diffInProgress = false;
snapshotDiffs.emplace_back(SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite,
SnapshotMergeOperation::Bytewise,
diffStart,
b.subspan(diffStart, i - diffStart));
}
Expand All @@ -56,7 +56,7 @@ void diffArrayRegions(std::vector<SnapshotDiff>& snapshotDiffs,
// If we finish with a diff in progress, add it
if (diffInProgress) {
snapshotDiffs.emplace_back(SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite,
SnapshotMergeOperation::Bytewise,
diffStart,
b.subspan(diffStart, endOffset - diffStart));
}
Expand Down Expand Up @@ -153,6 +153,22 @@ void SnapshotData::writeData(std::span<const uint8_t> buffer, uint32_t offset)
trackedChanges.emplace_back(offset, regionEnd);
}

void SnapshotData::xorData(std::span<const uint8_t> buffer, uint32_t offset)
{
size_t regionEnd = offset + buffer.size();
if (regionEnd > size) {
SPDLOG_ERROR(
"XORing snapshot data exceeding size: {} > {}", regionEnd, size);
throw std::runtime_error("XORing data exceeding size");
}

uint8_t* copyTarget = validatedOffsetPtr(offset);
std::transform(
buffer.begin(), buffer.end(), copyTarget, copyTarget, std::bit_xor());

trackedChanges.emplace_back(offset, regionEnd);
}

const uint8_t* SnapshotData::getDataPtr(uint32_t offset)
{
faabric::util::SharedLock lock(snapMx);
Expand Down Expand Up @@ -206,16 +222,26 @@ void SnapshotData::addMergeRegion(uint32_t offset,
mergeRegions.emplace_back(offset, length, dataType, operation);
}

void SnapshotData::fillGapsWithOverwriteRegions()
void SnapshotData::fillGapsWithBytewiseRegions()
{
faabric::util::FullLock lock(snapMx);

SnapshotMergeOperation fillType;
SystemConfig& conf = faabric::util::getSystemConfig();
if (conf.diffingMode == "bytewise") {
fillType = SnapshotMergeOperation::Bytewise;
} else if (conf.diffingMode == "xor") {
fillType = SnapshotMergeOperation::XOR;
} else {
SPDLOG_ERROR("Unsupported diffing mode: {}", conf.diffingMode);
throw std::runtime_error("Unsupported diffing mode");
}

// If there's no merge regions, just do one big one (note, zero length means
// fill all space
if (mergeRegions.empty()) {
SPDLOG_TRACE("Filling gap with single overwrite merge region");
mergeRegions.emplace_back(
0, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite);
SPDLOG_TRACE("Filling gap with single bytewise merge region");
mergeRegions.emplace_back(0, 0, SnapshotDataType::Raw, fillType);

return;
}
Expand All @@ -236,14 +262,12 @@ void SnapshotData::fillGapsWithOverwriteRegions()

uint32_t regionLen = r.offset - lastRegionEnd;

SPDLOG_TRACE("Filling gap with overwrite merge region {}-{}",
SPDLOG_TRACE("Filling gap with bytewise merge region {}-{}",
lastRegionEnd,
lastRegionEnd + regionLen);

mergeRegions.emplace_back(lastRegionEnd,
regionLen,
SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite);
mergeRegions.emplace_back(
lastRegionEnd, regionLen, SnapshotDataType::Raw, fillType);

lastRegionEnd = r.offset + r.length;
}
Expand All @@ -253,10 +277,8 @@ void SnapshotData::fillGapsWithOverwriteRegions()
lastRegionEnd);

// Add a final region at the end of the snapshot
mergeRegions.emplace_back(lastRegionEnd,
0,
SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite);
mergeRegions.emplace_back(
lastRegionEnd, 0, SnapshotDataType::Raw, fillType);
}
}

Expand Down Expand Up @@ -324,11 +346,15 @@ int SnapshotData::writeQueuedDiffs()

continue;
}
if (diff.getOperation() ==
faabric::util::SnapshotMergeOperation::Overwrite) {

if (diff.getOperation() ==
faabric::util::SnapshotMergeOperation::Bytewise) {
writeData(diff.getData(), diff.getOffset());
continue;
}

if (diff.getOperation() == faabric::util::SnapshotMergeOperation::XOR) {
xorData(diff.getData(), diff.getOffset());
continue;
}

Expand Down Expand Up @@ -434,7 +460,7 @@ std::vector<faabric::util::SnapshotDiff> SnapshotData::getTrackedChanges()
for (auto [regionBegin, regionEnd] : trackedChanges) {
SPDLOG_TRACE("Snapshot dirty {}-{}", regionBegin, regionEnd);
diffs.emplace_back(SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite,
SnapshotMergeOperation::Bytewise,
regionBegin,
d.subspan(regionBegin, regionEnd - regionBegin));
}
Expand All @@ -451,15 +477,15 @@ std::vector<faabric::util::SnapshotDiff> SnapshotData::diffWithDirtyRegions(
PROF_START(DiffWithSnapshot)
std::vector<faabric::util::SnapshotDiff> diffs;

// Always add an overwrite region that covers any extension of the
// Always add a bytewise region that covers any extension of the
// updated data
if (updated.size() > size) {
PROF_START(ExtensionDiff)
SPDLOG_TRACE(
"Adding diff to extend snapshot from {} to {}", size, updated.size());
size_t extensionLen = updated.size() - size;
diffs.emplace_back(SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite,
SnapshotMergeOperation::Bytewise,
size,
updated.subspan(size, extensionLen));
PROF_END(ExtensionDiff)
Expand Down Expand Up @@ -538,8 +564,8 @@ std::string snapshotMergeOpStr(SnapshotMergeOperation op)
case (SnapshotMergeOperation::Min): {
return "Min";
}
case (SnapshotMergeOperation::Overwrite): {
return "Overwrite";
case (SnapshotMergeOperation::Bytewise): {
return "Bytewise";
}
case (SnapshotMergeOperation::Product): {
return "Product";
Expand All @@ -550,6 +576,9 @@ std::string snapshotMergeOpStr(SnapshotMergeOperation op)
case (SnapshotMergeOperation::Sum): {
return "Sum";
}
case (SnapshotMergeOperation::XOR): {
return "XOR";
}
default: {
SPDLOG_ERROR("Cannot convert snapshot merge op to string: {}", op);
throw std::runtime_error("Cannot convert merge op to string");
Expand Down Expand Up @@ -622,7 +651,11 @@ void SnapshotMergeRegion::addDiffs(std::vector<SnapshotDiff>& diffs,
}
startPage += std::distance(startIt, foundIt);

if (operation == SnapshotMergeOperation::Overwrite) {
// Bytewise and XOR both deal with overwriting bytes without any
// other logic. Bytewise will filter in only the modified bytes,
// whereas XOR will transmit the XOR of the whole page and the original
if (operation == SnapshotMergeOperation::Bytewise ||
operation == SnapshotMergeOperation::XOR) {
// Iterate through pages
for (int p = startPage; p < endPage; p++) {
// Skip if page not dirty
Expand All @@ -639,11 +672,30 @@ void SnapshotMergeRegion::addDiffs(std::vector<SnapshotDiff>& diffs,

SPDLOG_TRACE("Checking page {} {}-{}", p, startByte, endByte);

diffArrayRegions(
diffs, startByte, endByte, originalData, updatedData);
if (operation == SnapshotMergeOperation::Bytewise) {
diffArrayRegions(
diffs, startByte, endByte, originalData, updatedData);
} else {
uint32_t rangeSize = endByte - startByte;
std::transform(originalData.begin() + startByte,
originalData.begin() + startByte + rangeSize,
updatedData.begin() + startByte,
updatedData.begin() + startByte,
std::bit_xor<uint8_t>());

SPDLOG_TRACE("Adding {} XOR merge: {}-{}",
snapshotDataTypeStr(dataType),
startByte,
startByte + rangeSize);

diffs.emplace_back(dataType,
operation,
startByte,
updatedData.subspan(startByte, rangeSize));
}
}

// This is the end of the overwrite diff
// This is the end of the XOR/bytewise diff
return;
}

Expand Down
18 changes: 15 additions & 3 deletions tests/test/scheduler/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,10 @@ TEST_CASE_METHOD(TestExecutorFixture,
{
int nThreads = 4;

SECTION("XOR diffs") { conf.diffingMode = "xor"; }

SECTION("Bytewise diffs") { conf.diffingMode = "bytewise"; }

// Sanity check memory size
REQUIRE(TEST_EXECUTOR_DEFAULT_MEMORY_SIZE > nThreads * HOST_PAGE_SIZE);

Expand Down Expand Up @@ -691,9 +695,17 @@ TEST_CASE_METHOD(TestExecutorFixture,
REQUIRE(diffList.at(i).getOffset() ==
i * faabric::util::HOST_PAGE_SIZE);

std::vector<uint8_t> expected = { (uint8_t)(i + 1),
(uint8_t)(i + 2),
(uint8_t)(i + 3) };
std::vector<uint8_t> expected;
if (conf.diffingMode == "xor") {
// In XOR mode we'll get the whole page back with a modification at
// the start
expected = std::vector<uint8_t>(HOST_PAGE_SIZE, 0);
expected[0] = i + 1;
expected[1] = i + 2;
expected[2] = i + 3;
} else {
expected = { (uint8_t)(i + 1), (uint8_t)(i + 2), (uint8_t)(i + 3) };
}

std::vector<uint8_t> actual = diffList.at(i).getDataCopy();

Expand Down
16 changes: 8 additions & 8 deletions tests/test/snapshot/test_snapshot_client_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture,
// Add merge regions to one
std::vector<SnapshotMergeRegion> mergeRegions = {
{ 123, 1234, SnapshotDataType::Int, SnapshotMergeOperation::Sum },
{ 345, 3456, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite }
{ 345, 3456, SnapshotDataType::Raw, SnapshotMergeOperation::Bytewise }
};

for (const auto& m : mergeRegions) {
Expand Down Expand Up @@ -167,7 +167,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture,

std::vector<SnapshotMergeRegion> mergeRegions = {
{ 123, 1234, SnapshotDataType::Int, SnapshotMergeOperation::Sum },
{ 345, 3456, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite }
{ 345, 3456, SnapshotDataType::Raw, SnapshotMergeOperation::Bytewise }
};

for (const auto& m : mergeRegions) {
Expand All @@ -183,12 +183,12 @@ TEST_CASE_METHOD(SnapshotClientServerFixture,
REQUIRE(snap->getQueuedDiffsCount() == 0);

SnapshotDiff diffA1(SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite,
SnapshotMergeOperation::Bytewise,
offsetA1,
diffDataA1);

SnapshotDiff diffA2(SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite,
SnapshotMergeOperation::Bytewise,
offsetA2,
diffDataA2);

Expand All @@ -207,17 +207,17 @@ TEST_CASE_METHOD(SnapshotClientServerFixture,
std::vector<uint8_t> diffDataB3 = { 1, 1, 2, 2, 3, 3, 4, 4 };

SnapshotDiff diffB1(SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite,
SnapshotMergeOperation::Bytewise,
offsetB1,
diffDataB1);

SnapshotDiff diffB2(SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite,
SnapshotMergeOperation::Bytewise,
offsetB2,
diffDataB2);

SnapshotDiff diffB3(SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite,
SnapshotMergeOperation::Bytewise,
offsetB3,
diffDataB3);

Expand Down Expand Up @@ -328,7 +328,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture,
std::vector<uint8_t> diffData;
std::vector<uint8_t> expectedData;

SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
SnapshotMergeOperation operation = SnapshotMergeOperation::Bytewise;
SnapshotDataType dataType = SnapshotDataType::Raw;

SECTION("Integer")
Expand Down
Loading