Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshots overhaul #176

Merged
merged 53 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4b37854
Start on changing snapshot sizes
Shillaker Nov 18, 2021
ccfe652
Resizing
Shillaker Nov 18, 2021
5645712
Up to failing test
Shillaker Nov 18, 2021
86b7e2e
Tests for memory utils
Shillaker Nov 18, 2021
692e5d8
Fixing up tests
Shillaker Nov 18, 2021
808076c
Test for expanding and remapping snapshots
Shillaker Nov 18, 2021
ae6f3ac
Formatting
Shillaker Nov 18, 2021
d82ca4a
Checking for uninitialised fds
Shillaker Nov 18, 2021
3e10dbc
Test for pushing snapshots
Shillaker Nov 18, 2021
12f0f03
Merge branch 'master' into snap-sizes
Shillaker Dec 9, 2021
c311c82
Test fixes, formatting
Shillaker Dec 9, 2021
a050378
Add target to dev.sanitise
Shillaker Dec 9, 2021
09720bf
Half-way through refactor
Shillaker Dec 10, 2021
3c8273a
Small typos etc.
Shillaker Dec 10, 2021
f1ae79c
Continued refactor
Shillaker Dec 10, 2021
1f3fc82
Compiling
Shillaker Dec 10, 2021
2267dfd
Fixing up tests
Shillaker Dec 10, 2021
261a814
Fix tests
Shillaker Dec 10, 2021
6aaab19
Formatting
Shillaker Dec 10, 2021
4559377
Fixing up dist tests
Shillaker Dec 10, 2021
a0fdd8f
Fix failing asan test
Shillaker Dec 13, 2021
40256fd
Small tidy-up
Shillaker Dec 13, 2021
2669d3d
Typos
Shillaker Dec 13, 2021
bab12f1
PR comments WIP
Shillaker Dec 13, 2021
2535a3d
Remove vector/ span duplicates
Shillaker Dec 13, 2021
81f8c53
More tests
Shillaker Dec 14, 2021
a0b36a8
Adding private and shared snapshot mappings
Shillaker Dec 14, 2021
b542697
Compilation fixes
Shillaker Dec 14, 2021
3340e63
Sketching out memory view class
Shillaker Dec 14, 2021
d337f21
Follow through with MemoryView refactor
Shillaker Dec 14, 2021
f13b403
Fixing tests
Shillaker Dec 14, 2021
f1aff36
Fixing tests
Shillaker Dec 14, 2021
d9ce863
Tidying up
Shillaker Dec 14, 2021
b0a5ba8
Move merging logic into SnapshotData
Shillaker Dec 15, 2021
2e79ad3
Master writing back snapshot changes locally
Shillaker Dec 15, 2021
ce302a0
Failing distributed reduction test
Shillaker Dec 15, 2021
ea394a5
Formatting and tests
Shillaker Dec 15, 2021
bfc6aff
Adding repeats into dist test
Shillaker Dec 15, 2021
f6a49da
Fix up distributed test
Shillaker Dec 16, 2021
a2a0a0d
Tidy-up
Shillaker Dec 16, 2021
13c30c6
Override CPU count in dist tests
Shillaker Dec 16, 2021
d304a5c
Fix race condition in master snapshot
Shillaker Dec 17, 2021
d9ab090
Added locking log statement
Shillaker Dec 17, 2021
ec3fba6
Fix scheduler test
Shillaker Dec 17, 2021
195d386
Fix straggler error in tests
Shillaker Dec 17, 2021
2632dc9
Immutable snapshot diffs with ownership
Shillaker Dec 17, 2021
77ee625
Fix up broken tests
Shillaker Dec 17, 2021
0ae55a8
Fix compile error in dist tests
Shillaker Dec 17, 2021
ca0b823
Test fix-up
Shillaker Dec 17, 2021
9dc3902
Formatting
Shillaker Dec 17, 2021
0522c8e
Fix dist tests
Shillaker Dec 17, 2021
284889e
Rename _data param
Shillaker Dec 17, 2021
fff0f06
PR comments
Shillaker Dec 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Executor

void releaseClaim();

virtual faabric::util::SnapshotData snapshot();
virtual faabric::util::SnapshotData& snapshot();

protected:
virtual void restore(faabric::Message& msg);
Expand All @@ -80,6 +80,8 @@ class Executor

uint32_t threadPoolSize = 0;

faabric::util::SnapshotData _snapshot;

private:
std::atomic<bool> claimed = false;

Expand Down
5 changes: 3 additions & 2 deletions include/faabric/snapshot/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace faabric::snapshot {
// Mocking
// -----------------------------------

std::vector<std::pair<std::string, faabric::util::SnapshotData>>
std::vector<std::pair<std::string, faabric::util::SnapshotData>>&
getSnapshotPushes();

std::vector<std::pair<std::string, std::vector<faabric::util::SnapshotDiff>>>
Expand All @@ -36,7 +36,8 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient

void pushSnapshot(const std::string& key,
int32_t groupId,
const faabric::util::SnapshotData& data);
const faabric::util::SnapshotData& data,
size_t maxSize = 0);

void pushSnapshotDiffs(std::string snapshotKey,
int32_t groupId,
Expand Down
8 changes: 5 additions & 3 deletions include/faabric/snapshot/SnapshotRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ class SnapshotRegistry
void mapSnapshot(const std::string& key, uint8_t* target);

void takeSnapshot(const std::string& key,
faabric::util::SnapshotData data,
faabric::util::SnapshotData& data,
bool locallyRestorable = true);

void takeSnapshotIfNotExists(const std::string& key,
faabric::util::SnapshotData data,
faabric::util::SnapshotData& data,
bool locallyRestorable = true);

void deleteSnapshot(const std::string& key);

void changeSnapshotSize(const std::string& key, size_t newSize);

size_t getSnapshotCount();

void clear();
Expand All @@ -47,7 +49,7 @@ class SnapshotRegistry
faabric::util::SnapshotData& data);

void doTakeSnapshot(const std::string& key,
faabric::util::SnapshotData data,
const faabric::util::SnapshotData& data,
bool locallyRestorable,
bool overwrite);
};
Expand Down
22 changes: 22 additions & 0 deletions include/faabric/util/memory.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <cstdint>
#include <string>
#include <unistd.h>
#include <vector>

Expand Down Expand Up @@ -41,4 +42,25 @@ std::vector<int> getDirtyPageNumbers(const uint8_t* ptr, int nPages);

std::vector<std::pair<uint32_t, uint32_t>> getDirtyRegions(const uint8_t* ptr,
int nPages);
// -------------------------
// Allocation
// -------------------------

void deallocateMemory(uint8_t* memory, size_t size);

void deallocatePages(uint8_t* memory, int nPages);

uint8_t* allocateSharedMemory(size_t size);

uint8_t* allocatePrivateMemory(size_t size);

uint8_t* allocateVirtualMemory(size_t size);

void claimVirtualMemory(uint8_t* start, size_t size);

void mapMemory(uint8_t* target, size_t size, int fd);

int writeMemoryToFd(uint8_t* source, size_t size, const std::string& fdLabel);

void appendDataToFd(int fd, size_t oldSize, size_t newSize, uint8_t* newData);
}
30 changes: 27 additions & 3 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <faabric/util/logging.h>
#include <faabric/util/macros.h>

#define FOUR_GB (size_t)(1024L * 1024L * 1024L * 4L)

namespace faabric::util {

enum SnapshotDataType
Expand Down Expand Up @@ -73,13 +75,20 @@ class SnapshotMergeRegion
class SnapshotData
{
public:
size_t size = 0;
uint8_t* data = nullptr;
int fd = 0;
size_t size = 0;

SnapshotData() = default;

std::vector<SnapshotDiff> getDirtyPages();
SnapshotData(const SnapshotData&);

SnapshotData& operator=(const SnapshotData&) = delete;

~SnapshotData();

bool isRestorable();

std::vector<SnapshotDiff> getDirtyRegions();

std::vector<SnapshotDiff> getChangeDiffs(const uint8_t* updated,
size_t updatedSize);
Expand All @@ -90,7 +99,22 @@ class SnapshotData
SnapshotMergeOperation operation,
bool overwrite = false);

void clearMergeRegions();

void setSnapshotSize(size_t newSize);

void updateFd();

void writeToFd(const std::string& fdLabel);

void mapToMemory(uint8_t* target);

std::map<uint32_t, SnapshotMergeRegion> getMergeRegions();

private:
int fd = 0;
size_t fdSize = 0;

// Note - we care about the order of this map, as we iterate through it
// in order of offsets
std::map<uint32_t, SnapshotMergeRegion> mergeRegions;
Expand Down
1 change: 1 addition & 0 deletions src/flat/faabric.fbs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
table SnapshotPushRequest {
key:string;
groupid:int;
maxSize:ulong;
contents:[ubyte];
}

Expand Down
12 changes: 8 additions & 4 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
// Work out if we should skip the reset after this batch. This only needs to
// happen when we're executing threads on the master host, in which case the
// original function call will cause a reset
bool skipReset = isMaster && isThreads;
// bool skipReset = isMaster && isThreads;
bool skipReset = isThreads;

// Iterate through and invoke tasks. By default, we allocate tasks
// one-to-one with thread pool threads. Only once the pool is exhausted do
Expand Down Expand Up @@ -299,6 +300,10 @@ void Executor::threadPoolThread(int threadPoolIdx)

// Reset dirty page tracking now that we've pushed the diffs
faabric::util::resetDirtyTracking();

// Clear any merge regions
SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey());
snapshotPreExecution->clearMergeRegions();
}

// If this batch is finished, reset the executor and release its claim.
Expand Down Expand Up @@ -403,11 +408,10 @@ void Executor::postFinish() {}

void Executor::reset(faabric::Message& msg) {}

faabric::util::SnapshotData Executor::snapshot()
faabric::util::SnapshotData& Executor::snapshot()
{
SPDLOG_WARN("Executor has not implemented snapshot method");
faabric::util::SnapshotData d;
return d;
return _snapshot;
}

void Executor::restore(faabric::Message& msg)
Expand Down
6 changes: 3 additions & 3 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
// if so, just push the diffs
if (pushedSnapshotsMap[snapshotKey].contains(host)) {
std::vector<faabric::util::SnapshotDiff> snapshotDiffs =
snapshotData->getDirtyPages();
snapshotData->getDirtyRegions();
c.pushSnapshotDiffs(
snapshotKey, firstMsg.groupid(), snapshotDiffs);
} else {
Expand All @@ -515,11 +515,11 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
}

// Now reset the dirty page tracking just before we start executing
SPDLOG_DEBUG("Resetting dirty tracking after pushing diffs {}", funcStr);
SPDLOG_DEBUG("Resetting dirty tracking before executing {}", funcStr);
faabric::util::resetDirtyTracking();

// -------------------------------------------
// EXECTUION
// EXECUTION
// -------------------------------------------

// Records for tests - copy messages before execution to avoid racing on msg
Expand Down
10 changes: 6 additions & 4 deletions src/snapshot/SnapshotClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ static std::vector<std::pair<std::string, std::string>> snapshotDeletes;
static std::vector<std::pair<std::string, std::pair<uint32_t, int>>>
threadResults;

std::vector<std::pair<std::string, faabric::util::SnapshotData>>
std::vector<std::pair<std::string, faabric::util::SnapshotData>>&
getSnapshotPushes()
{
faabric::util::UniqueLock lock(mockMutex);
Expand Down Expand Up @@ -73,7 +73,8 @@ SnapshotClient::SnapshotClient(const std::string& hostIn)

void SnapshotClient::pushSnapshot(const std::string& key,
int groupId,
const faabric::util::SnapshotData& data)
const faabric::util::SnapshotData& data,
size_t maxSize)
{
if (data.size == 0) {
SPDLOG_ERROR("Cannot push snapshot {} with size zero to {}", key, host);
Expand All @@ -84,15 +85,16 @@ void SnapshotClient::pushSnapshot(const std::string& key,

if (faabric::util::isMockMode()) {
faabric::util::UniqueLock lock(mockMutex);

snapshotPushes.emplace_back(host, data);
} else {
// Set up the main request
// TODO - avoid copying data here
flatbuffers::FlatBufferBuilder mb;
auto keyOffset = mb.CreateString(key);
auto dataOffset = mb.CreateVector<uint8_t>(data.data, data.size);
auto requestOffset =
CreateSnapshotPushRequest(mb, keyOffset, groupId, dataOffset);
auto requestOffset = CreateSnapshotPushRequest(
mb, keyOffset, groupId, maxSize, dataOffset);
mb.Finish(requestOffset);

// Send it
Expand Down
88 changes: 18 additions & 70 deletions src/snapshot/SnapshotRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,45 +33,26 @@ bool SnapshotRegistry::snapshotExists(const std::string& key)
void SnapshotRegistry::mapSnapshot(const std::string& key, uint8_t* target)
Shillaker marked this conversation as resolved.
Show resolved Hide resolved
{
auto d = getSnapshot(key);

if (!faabric::util::isPageAligned((void*)target)) {
SPDLOG_ERROR(
"Mapping snapshot {} to non page-aligned address {}", key, target);
throw std::runtime_error(
"Mapping snapshot to non page-aligned address");
}

if (d->fd == 0) {
SPDLOG_ERROR("Attempting to map non-restorable snapshot");
throw std::runtime_error("Mapping non-restorable snapshot");
}

void* mmapRes =
mmap(target, d->size, PROT_WRITE, MAP_PRIVATE | MAP_FIXED, d->fd, 0);

if (mmapRes == MAP_FAILED) {
SPDLOG_ERROR(
"mmapping snapshot failed: {} ({})", errno, ::strerror(errno));
throw std::runtime_error("mmapping snapshot failed");
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is now in the SnapshotData class.

d->mapToMemory(target);
}

void SnapshotRegistry::takeSnapshotIfNotExists(const std::string& key,
faabric::util::SnapshotData data,
bool locallyRestorable)
void SnapshotRegistry::takeSnapshotIfNotExists(
const std::string& key,
faabric::util::SnapshotData& data,
bool locallyRestorable)
{
doTakeSnapshot(key, data, locallyRestorable, false);
}

void SnapshotRegistry::takeSnapshot(const std::string& key,
faabric::util::SnapshotData data,
faabric::util::SnapshotData& data,
bool locallyRestorable)
{
doTakeSnapshot(key, data, locallyRestorable, true);
}

void SnapshotRegistry::doTakeSnapshot(const std::string& key,
faabric::util::SnapshotData data,
const faabric::util::SnapshotData& data,
bool locallyRestorable,
bool overwrite)
{
Expand All @@ -94,13 +75,13 @@ void SnapshotRegistry::doTakeSnapshot(const std::string& key,

// Note - we only preserve the snapshot in the in-memory file, and do not
// take ownership for the original data referenced in SnapshotData
auto shared_data =
auto sharedData =
std::make_shared<faabric::util::SnapshotData>(std::move(data));
snapshotMap[key] = shared_data;
snapshotMap[key] = sharedData;

// Write to fd to be locally restorable
if (locallyRestorable) {
writeSnapshotToFd(key, *shared_data);
sharedData->writeToFd(key);
}
}

Expand All @@ -112,17 +93,16 @@ void SnapshotRegistry::deleteSnapshot(const std::string& key)
return;
Shillaker marked this conversation as resolved.
Show resolved Hide resolved
}

auto d = snapshotMap[key];
snapshotMap.erase(key);
}

// Note - the data referenced by the SnapshotData object is not owned by the
// snapshot registry so we don't delete it here. We only remove the file
// descriptor used for mapping memory
if (d->fd > 0) {
::close(d->fd);
d->fd = 0;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the file descriptor is also handled in SnapshotData now.

void SnapshotRegistry::changeSnapshotSize(const std::string& key,
size_t newSize)
{
faabric::util::FullLock lock(snapshotsMx);

snapshotMap.erase(key);
auto d = getSnapshot(key);
d->setSnapshotSize(newSize);
}

size_t SnapshotRegistry::getSnapshotCount()
Expand All @@ -140,38 +120,6 @@ SnapshotRegistry& getSnapshotRegistry()
void SnapshotRegistry::clear()
{
faabric::util::FullLock lock(snapshotsMx);
for (auto p : snapshotMap) {
if (p.second->fd > 0) {
::close(p.second->fd);
}
}

snapshotMap.clear();
}

int SnapshotRegistry::writeSnapshotToFd(const std::string& key,
faabric::util::SnapshotData& data)
{
int fd = ::memfd_create(key.c_str(), 0);

// Make the fd big enough
int ferror = ::ftruncate(fd, data.size);
if (ferror) {
SPDLOG_ERROR("ferror call failed with error {}", ferror);
throw std::runtime_error("Failed writing memory to fd (ftruncate)");
}

// Write the data
ssize_t werror = ::write(fd, data.data, data.size);
if (werror == -1) {
SPDLOG_ERROR("Write call failed with error {}", werror);
throw std::runtime_error("Failed writing memory to fd (write)");
}

// Record the fd
data.fd = fd;

SPDLOG_DEBUG("Wrote snapshot {} to fd {}", key, fd);
return fd;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to SnapshotData.

}
}
Loading