Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshots overhaul #176

Merged
merged 53 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4b37854
Start on changing snapshot sizes
Shillaker Nov 18, 2021
ccfe652
Resizing
Shillaker Nov 18, 2021
5645712
Up to failing test
Shillaker Nov 18, 2021
86b7e2e
Tests for memory utils
Shillaker Nov 18, 2021
692e5d8
Fixing up tests
Shillaker Nov 18, 2021
808076c
Test for expanding and remapping snapshots
Shillaker Nov 18, 2021
ae6f3ac
Formatting
Shillaker Nov 18, 2021
d82ca4a
Checking for uninitialised fds
Shillaker Nov 18, 2021
3e10dbc
Test for pushing snapshots
Shillaker Nov 18, 2021
12f0f03
Merge branch 'master' into snap-sizes
Shillaker Dec 9, 2021
c311c82
Test fixes, formatting
Shillaker Dec 9, 2021
a050378
Add target to dev.sanitise
Shillaker Dec 9, 2021
09720bf
Half-way through refactor
Shillaker Dec 10, 2021
3c8273a
Small typos etc.
Shillaker Dec 10, 2021
f1ae79c
Continued refactor
Shillaker Dec 10, 2021
1f3fc82
Compiling
Shillaker Dec 10, 2021
2267dfd
Fixing up tests
Shillaker Dec 10, 2021
261a814
Fix tests
Shillaker Dec 10, 2021
6aaab19
Formatting
Shillaker Dec 10, 2021
4559377
Fixing up dist tests
Shillaker Dec 10, 2021
a0fdd8f
Fix failing asan test
Shillaker Dec 13, 2021
40256fd
Small tidy-up
Shillaker Dec 13, 2021
2669d3d
Typos
Shillaker Dec 13, 2021
bab12f1
PR comments WIP
Shillaker Dec 13, 2021
2535a3d
Remove vector/ span duplicates
Shillaker Dec 13, 2021
81f8c53
More tests
Shillaker Dec 14, 2021
a0b36a8
Adding private and shared snapshot mappings
Shillaker Dec 14, 2021
b542697
Compilation fixes
Shillaker Dec 14, 2021
3340e63
Sketching out memory view class
Shillaker Dec 14, 2021
d337f21
Follow through with MemoryView refactor
Shillaker Dec 14, 2021
f13b403
Fixing tests
Shillaker Dec 14, 2021
f1aff36
Fixing tests
Shillaker Dec 14, 2021
d9ce863
Tidying up
Shillaker Dec 14, 2021
b0a5ba8
Move merging logic into SnapshotData
Shillaker Dec 15, 2021
2e79ad3
Master writing back snapshot changes locally
Shillaker Dec 15, 2021
ce302a0
Failing distributed reduction test
Shillaker Dec 15, 2021
ea394a5
Formatting and tests
Shillaker Dec 15, 2021
bfc6aff
Adding repeats into dist test
Shillaker Dec 15, 2021
f6a49da
Fix up distributed test
Shillaker Dec 16, 2021
a2a0a0d
Tidy-up
Shillaker Dec 16, 2021
13c30c6
Override CPU count in dist tests
Shillaker Dec 16, 2021
d304a5c
Fix race condition in master snapshot
Shillaker Dec 17, 2021
d9ab090
Added locking log statement
Shillaker Dec 17, 2021
ec3fba6
Fix scheduler test
Shillaker Dec 17, 2021
195d386
Fix straggler error in tests
Shillaker Dec 17, 2021
2632dc9
Immutable snapshot diffs with ownership
Shillaker Dec 17, 2021
77ee625
Fix up broken tests
Shillaker Dec 17, 2021
0ae55a8
Fix compile error in dist tests
Shillaker Dec 17, 2021
ca0b823
Test fix-up
Shillaker Dec 17, 2021
9dc3902
Formatting
Shillaker Dec 17, 2021
0522c8e
Fix dist tests
Shillaker Dec 17, 2021
284889e
Rename _data param
Shillaker Dec 17, 2021
fff0f06
PR comments
Shillaker Dec 20, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/sanitisers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ jobs:
${{ runner.os }}-
# --- Build with thread sanitising
- name: "Build tests with address sanitising"
run: inv dev.sanitise Address --noclean
run: inv dev.sanitise Address faabric_tests --noclean
Copy link
Collaborator Author

@Shillaker Shillaker Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a target to the dev.sanitise task rather than assuming it's always the tests.

- name: "Run tests with address sanitising on and print to stderr to file"
run: ./bin/faabric_tests
working-directory: /build/faabric/static
Expand Down Expand Up @@ -149,7 +149,7 @@ jobs:
${{ runner.os }}-
# --- Build with thread sanitising
- name: "Build tests with thread sanitising"
run: inv dev.sanitise Thread --noclean
run: inv dev.sanitise Thread faabric_tests --noclean
# --- Tests ---
- name: "Run tests with thread sanitising on and print stderr to file"
run: ./bin/faabric_tests
Expand Down Expand Up @@ -200,7 +200,7 @@ jobs:
${{ runner.os }}-
# --- Build with thread sanitising
- name: "Build tests with undefined sanitising"
run: inv dev.sanitise Undefined --noclean
run: inv dev.sanitise Undefined faabric_tests --noclean
# --- Tests ---
- name: "Run tests with undefined sanitising on and print stderr to file"
run: ./bin/faabric_tests
Expand Down Expand Up @@ -251,7 +251,7 @@ jobs:
${{ runner.os }}-
# --- Build with thread sanitising
- name: "Build tests with leak sanitising"
run: inv dev.sanitise Leak --noclean
run: inv dev.sanitise Leak faabric_tests --noclean
# --- Tests ---
- name: "Run tests with leak sanitising on and print stderr to file"
run: ./bin/faabric_tests
Expand Down
2 changes: 1 addition & 1 deletion include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Executor

void releaseClaim();

virtual faabric::util::SnapshotData snapshot();
virtual std::shared_ptr<faabric::util::SnapshotData> snapshot();

protected:
virtual void restore(faabric::Message& msg);
Expand Down
5 changes: 3 additions & 2 deletions include/faabric/snapshot/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ namespace faabric::snapshot {
// Mocking
// -----------------------------------

std::vector<std::pair<std::string, faabric::util::SnapshotData>>
std::vector<
std::pair<std::string, std::shared_ptr<faabric::util::SnapshotData>>>
getSnapshotPushes();

std::vector<std::pair<std::string, std::vector<faabric::util::SnapshotDiff>>>
Expand All @@ -36,7 +37,7 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient

void pushSnapshot(const std::string& key,
int32_t groupId,
const faabric::util::SnapshotData& data);
std::shared_ptr<faabric::util::SnapshotData> data);

void pushSnapshotDiffs(std::string snapshotKey,
int32_t groupId,
Expand Down
18 changes: 8 additions & 10 deletions include/faabric/snapshot/SnapshotRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ class SnapshotRegistry

void mapSnapshot(const std::string& key, uint8_t* target);

void takeSnapshot(const std::string& key,
faabric::util::SnapshotData data,
bool locallyRestorable = true);
void registerSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void takeSnapshotIfNotExists(const std::string& key,
faabric::util::SnapshotData data,
bool locallyRestorable = true);
void registerSnapshotIfNotExists(
const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void deleteSnapshot(const std::string& key);

Expand All @@ -46,10 +45,9 @@ class SnapshotRegistry
int writeSnapshotToFd(const std::string& key,
faabric::util::SnapshotData& data);

void doTakeSnapshot(const std::string& key,
faabric::util::SnapshotData data,
bool locallyRestorable,
bool overwrite);
void doRegisterSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data,
bool overwrite);
};

SnapshotRegistry& getSnapshotRegistry();
Expand Down
25 changes: 25 additions & 0 deletions include/faabric/util/memory.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#pragma once

#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <unistd.h>
#include <vector>

Expand Down Expand Up @@ -41,4 +44,26 @@ std::vector<int> getDirtyPageNumbers(const uint8_t* ptr, int nPages);

std::vector<std::pair<uint32_t, uint32_t>> getDirtyRegions(const uint8_t* ptr,
int nPages);
// -------------------------
// Allocation
// -------------------------

typedef std::unique_ptr<uint8_t[], std::function<void(uint8_t*)>>
OwnedMmapRegion;

OwnedMmapRegion allocateSharedMemory(size_t size);

OwnedMmapRegion allocatePrivateMemory(size_t size);

OwnedMmapRegion allocateVirtualMemory(size_t size);

void claimVirtualMemory(uint8_t* start, size_t size);

void mapMemory(uint8_t* target, size_t size, int fd);

int writeMemoryToFd(const uint8_t* source,
size_t size,
const std::string& fdLabel);

void appendDataToFd(int fd, size_t oldSize, size_t newSize, uint8_t* newData);
Shillaker marked this conversation as resolved.
Show resolved Hide resolved
}
56 changes: 53 additions & 3 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <vector>

#include <faabric/util/logging.h>
#include <faabric/util/macros.h>
#include <faabric/util/memory.h>

namespace faabric::util {

Expand Down Expand Up @@ -74,12 +76,45 @@ class SnapshotData
{
public:
size_t size = 0;
uint8_t* data = nullptr;
int fd = 0;
size_t maxSize = 0;

SnapshotData() = default;

std::vector<SnapshotDiff> getDirtyPages();
explicit SnapshotData(size_t sizeIn);

explicit SnapshotData(std::vector<uint8_t> dataIn);

SnapshotData(size_t sizeIn, size_t maxSizeIn);

SnapshotData(uint8_t* dataIn, size_t sizeIn);

SnapshotData(uint8_t* dataIn, size_t sizeIn, size_t maxSizeIn);
Shillaker marked this conversation as resolved.
Show resolved Hide resolved

SnapshotData(const SnapshotData&) = delete;

SnapshotData& operator=(const SnapshotData&) = delete;

~SnapshotData();

bool isRestorable();

void makeRestorable(const std::string& fdLabel);

void setSnapshotSize(size_t newSize);

void copyInData(std::vector<uint8_t> buffer, uint32_t offset = 0);
eigenraven marked this conversation as resolved.
Show resolved Hide resolved

void copyInData(uint8_t* buffer, size_t bufferSize, uint32_t offset = 0);

const uint8_t* getDataPtr(uint32_t offset = 0);

uint8_t* getMutableDataPtr(uint32_t offset = 0);

std::vector<uint8_t> getDataCopy();

std::vector<uint8_t> getDataCopy(uint32_t offset, size_t dataSize);

std::vector<SnapshotDiff> getDirtyRegions();

std::vector<SnapshotDiff> getChangeDiffs(const uint8_t* updated,
size_t updatedSize);
Expand All @@ -90,7 +125,22 @@ class SnapshotData
SnapshotMergeOperation operation,
bool overwrite = false);

void clearMergeRegions();

void mapToMemory(uint8_t* target);

std::map<uint32_t, SnapshotMergeRegion> getMergeRegions();

private:
int fd = 0;
size_t fdSize = 0;

std::shared_mutex snapMx;

bool owner = false;
uint8_t* rawData = nullptr;
OwnedMmapRegion ownedData = nullptr;
Shillaker marked this conversation as resolved.
Show resolved Hide resolved

// Note - we care about the order of this map, as we iterate through it
// in order of offsets
std::map<uint32_t, SnapshotMergeRegion> mergeRegions;
Expand Down
1 change: 1 addition & 0 deletions src/flat/faabric.fbs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
table SnapshotPushRequest {
key:string;
groupid:int;
maxSize:ulong;
contents:[ubyte];
}

Expand Down
19 changes: 13 additions & 6 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
// Work out if we should skip the reset after this batch. This only needs to
// happen when we're executing threads on the master host, in which case the
// original function call will cause a reset
bool skipReset = isMaster && isThreads;

// TODO - should this be on master or all hosts?
// bool skipReset = isMaster && isThreads;
bool skipReset = isThreads;

// Iterate through and invoke tasks. By default, we allocate tasks
// one-to-one with thread pool threads. Only once the pool is exhausted do
Expand Down Expand Up @@ -292,13 +295,18 @@ void Executor::threadPoolThread(int threadPoolIdx)
msg.snapshotkey());

std::vector<faabric::util::SnapshotDiff> diffs =
snapshotPreExecution->getChangeDiffs(snapshotPostExecution.data,
snapshotPostExecution.size);
snapshotPreExecution->getChangeDiffs(
snapshotPostExecution->getDataPtr(),
snapshotPostExecution->size);

sch.pushSnapshotDiffs(msg, diffs);

// Reset dirty page tracking now that we've pushed the diffs
faabric::util::resetDirtyTracking();

// Clear any merge regions
SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey());
snapshotPreExecution->clearMergeRegions();
}

// If this batch is finished, reset the executor and release its claim.
Expand Down Expand Up @@ -403,11 +411,10 @@ void Executor::postFinish() {}

void Executor::reset(faabric::Message& msg) {}

faabric::util::SnapshotData Executor::snapshot()
std::shared_ptr<faabric::util::SnapshotData> Executor::snapshot()
{
SPDLOG_WARN("Executor has not implemented snapshot method");
faabric::util::SnapshotData d;
return d;
return nullptr;
}

void Executor::restore(faabric::Message& msg)
Expand Down
19 changes: 10 additions & 9 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ void Scheduler::addHostToGlobalSet()

void Scheduler::resetThreadLocalCache()
{
auto tid = (pid_t)syscall(SYS_gettid);
SPDLOG_DEBUG("Resetting scheduler thread-local cache for thread {}", tid);
SPDLOG_DEBUG("Resetting scheduler thread-local cache");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread ID is included in all log statements by default so this was unnecessary.


functionCallClients.clear();
snapshotClients.clear();
Expand Down Expand Up @@ -504,22 +503,22 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
// if so, just push the diffs
if (pushedSnapshotsMap[snapshotKey].contains(host)) {
std::vector<faabric::util::SnapshotDiff> snapshotDiffs =
snapshotData->getDirtyPages();
snapshotData->getDirtyRegions();
c.pushSnapshotDiffs(
snapshotKey, firstMsg.groupid(), snapshotDiffs);
} else {
c.pushSnapshot(snapshotKey, firstMsg.groupid(), *snapshotData);
c.pushSnapshot(snapshotKey, firstMsg.groupid(), snapshotData);
pushedSnapshotsMap[snapshotKey].insert(host);
}
}
}

// Now reset the dirty page tracking just before we start executing
SPDLOG_DEBUG("Resetting dirty tracking after pushing diffs {}", funcStr);
SPDLOG_DEBUG("Resetting dirty tracking before executing {}", funcStr);
faabric::util::resetDirtyTracking();

// -------------------------------------------
// EXECTUION
// EXECUTION
// -------------------------------------------

// Records for tests - copy messages before execution to avoid racing on msg
Expand Down Expand Up @@ -785,9 +784,11 @@ std::shared_ptr<Executor> Scheduler::claimExecutor(

// We have no warm executors available, so scale up
if (claimed == nullptr) {
int nExecutors = thisExecutors.size();
SPDLOG_DEBUG(
"Scaling {} from {} -> {}", funcStr, nExecutors, nExecutors + 1);
SPDLOG_DEBUG("Scaling {} from {} -> {}",
funcStr,
thisExecutors.size(),
thisExecutors.size() + 1);

// Spinning up a new executor can be lengthy, allow other things
// to run in parallel
schedulerLock.unlock();
Expand Down
25 changes: 15 additions & 10 deletions src/snapshot/SnapshotClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ namespace faabric::snapshot {

static std::mutex mockMutex;

static std::vector<std::pair<std::string, faabric::util::SnapshotData>>
static std::vector<
std::pair<std::string, std::shared_ptr<faabric::util::SnapshotData>>>
snapshotPushes;

static std::vector<
Expand All @@ -26,7 +27,8 @@ static std::vector<std::pair<std::string, std::string>> snapshotDeletes;
static std::vector<std::pair<std::string, std::pair<uint32_t, int>>>
threadResults;

std::vector<std::pair<std::string, faabric::util::SnapshotData>>
std::vector<
std::pair<std::string, std::shared_ptr<faabric::util::SnapshotData>>>
getSnapshotPushes()
{
faabric::util::UniqueLock lock(mockMutex);
Expand Down Expand Up @@ -71,28 +73,31 @@ SnapshotClient::SnapshotClient(const std::string& hostIn)
SNAPSHOT_SYNC_PORT)
{}

void SnapshotClient::pushSnapshot(const std::string& key,
int groupId,
const faabric::util::SnapshotData& data)
void SnapshotClient::pushSnapshot(
const std::string& key,
int groupId,
std::shared_ptr<faabric::util::SnapshotData> data)
{
if (data.size == 0) {
if (data->size == 0) {
SPDLOG_ERROR("Cannot push snapshot {} with size zero to {}", key, host);
throw std::runtime_error("Pushing snapshot with zero size");
}

SPDLOG_DEBUG("Pushing snapshot {} to {} ({} bytes)", key, host, data.size);
SPDLOG_DEBUG("Pushing snapshot {} to {} ({} bytes)", key, host, data->size);

if (faabric::util::isMockMode()) {
faabric::util::UniqueLock lock(mockMutex);

snapshotPushes.emplace_back(host, data);
} else {
// Set up the main request
// TODO - avoid copying data here
flatbuffers::FlatBufferBuilder mb;
auto keyOffset = mb.CreateString(key);
auto dataOffset = mb.CreateVector<uint8_t>(data.data, data.size);
auto requestOffset =
CreateSnapshotPushRequest(mb, keyOffset, groupId, dataOffset);
auto dataOffset =
mb.CreateVector<uint8_t>(data->getDataPtr(), data->size);
auto requestOffset = CreateSnapshotPushRequest(
mb, keyOffset, groupId, data->maxSize, dataOffset);
mb.Finish(requestOffset);

// Send it
Expand Down
Loading