Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshots overhaul #176

Merged
merged 53 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4b37854
Start on changing snapshot sizes
Shillaker Nov 18, 2021
ccfe652
Resizing
Shillaker Nov 18, 2021
5645712
Up to failing test
Shillaker Nov 18, 2021
86b7e2e
Tests for memory utils
Shillaker Nov 18, 2021
692e5d8
Fixing up tests
Shillaker Nov 18, 2021
808076c
Test for expanding and remapping snapshots
Shillaker Nov 18, 2021
ae6f3ac
Formatting
Shillaker Nov 18, 2021
d82ca4a
Checking for uninitialised fds
Shillaker Nov 18, 2021
3e10dbc
Test for pushing snapshots
Shillaker Nov 18, 2021
12f0f03
Merge branch 'master' into snap-sizes
Shillaker Dec 9, 2021
c311c82
Test fixes, formatting
Shillaker Dec 9, 2021
a050378
Add target to dev.sanitise
Shillaker Dec 9, 2021
09720bf
Half-way through refactor
Shillaker Dec 10, 2021
3c8273a
Small typos etc.
Shillaker Dec 10, 2021
f1ae79c
Continued refactor
Shillaker Dec 10, 2021
1f3fc82
Compiling
Shillaker Dec 10, 2021
2267dfd
Fixing up tests
Shillaker Dec 10, 2021
261a814
Fix tests
Shillaker Dec 10, 2021
6aaab19
Formatting
Shillaker Dec 10, 2021
4559377
Fixing up dist tests
Shillaker Dec 10, 2021
a0fdd8f
Fix failing asan test
Shillaker Dec 13, 2021
40256fd
Small tidy-up
Shillaker Dec 13, 2021
2669d3d
Typos
Shillaker Dec 13, 2021
bab12f1
PR comments WIP
Shillaker Dec 13, 2021
2535a3d
Remove vector/ span duplicates
Shillaker Dec 13, 2021
81f8c53
More tests
Shillaker Dec 14, 2021
a0b36a8
Adding private and shared snapshot mappings
Shillaker Dec 14, 2021
b542697
Compilation fixes
Shillaker Dec 14, 2021
3340e63
Sketching out memory view class
Shillaker Dec 14, 2021
d337f21
Follow through with MemoryView refactor
Shillaker Dec 14, 2021
f13b403
Fixing tests
Shillaker Dec 14, 2021
f1aff36
Fixing tests
Shillaker Dec 14, 2021
d9ce863
Tidying up
Shillaker Dec 14, 2021
b0a5ba8
Move merging logic into SnapshotData
Shillaker Dec 15, 2021
2e79ad3
Master writing back snapshot changes locally
Shillaker Dec 15, 2021
ce302a0
Failing distributed reduction test
Shillaker Dec 15, 2021
ea394a5
Formatting and tests
Shillaker Dec 15, 2021
bfc6aff
Adding repeats into dist test
Shillaker Dec 15, 2021
f6a49da
Fix up distributed test
Shillaker Dec 16, 2021
a2a0a0d
Tidy-up
Shillaker Dec 16, 2021
13c30c6
Override CPU count in dist tests
Shillaker Dec 16, 2021
d304a5c
Fix race condition in master snapshot
Shillaker Dec 17, 2021
d9ab090
Added locking log statement
Shillaker Dec 17, 2021
ec3fba6
Fix scheduler test
Shillaker Dec 17, 2021
195d386
Fix straggler error in tests
Shillaker Dec 17, 2021
2632dc9
Immutable snapshot diffs with ownership
Shillaker Dec 17, 2021
77ee625
Fix up broken tests
Shillaker Dec 17, 2021
0ae55a8
Fix compile error in dist tests
Shillaker Dec 17, 2021
ca0b823
Test fix-up
Shillaker Dec 17, 2021
9dc3902
Formatting
Shillaker Dec 17, 2021
0522c8e
Fix dist tests
Shillaker Dec 17, 2021
284889e
Rename _data param
Shillaker Dec 17, 2021
fff0f06
PR comments
Shillaker Dec 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dist-test/dev_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ THIS_DIR=$(dirname $(readlink -f $0))
PROJ_ROOT=${THIS_DIR}/..
pushd ${PROJ_ROOT} > /dev/null

export OVERRIDE_CPU_COUNT=4

if [[ -z "$1" ]]; then
docker-compose up -d dist-test-server
elif [[ "$1" == "restart" ]]; then
Expand Down
2 changes: 2 additions & 0 deletions dist-test/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pushd ${PROJ_ROOT} >> /dev/null
export CONAN_CACHE_MOUNT_SOURCE=$HOME/.conan/
RETURN_VAL=0

export OVERRIDE_CPU_COUNT=4

# Run the test server in the background
docker-compose \
up \
Expand Down
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ services:
- LOG_LEVEL=debug
- REDIS_STATE_HOST=redis
- REDIS_QUEUE_HOST=redis
- OVERRIDE_CPU_COUNT=${OVERRIDE_CPU_COUNT:-0}
- ASAN_OPTIONS=verbosity=1:halt_on_error=1
- TSAN_OPTIONS=halt_on_error=1:suppressions=/code/faabric/thread-sanitizer-ignorelist.txt:history_size=7:second_deadlock_stack=1
- UBSAN_OPTIONS="print_stacktrace=1:halt_on_error=1
depends_on:
- redis

Expand All @@ -34,6 +38,7 @@ services:
- LOG_LEVEL=debug
- REDIS_STATE_HOST=redis
- REDIS_QUEUE_HOST=redis
- OVERRIDE_CPU_COUNT=${OVERRIDE_CPU_COUNT:-0}
command: ./bin/faabric_dist_test_server
depends_on:
- redis
6 changes: 3 additions & 3 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ class ExecutorTask
ExecutorTask(int messageIndexIn,
std::shared_ptr<faabric::BatchExecuteRequest> reqIn,
std::shared_ptr<std::atomic<int>> batchCounterIn,
bool needsSnapshotPushIn,
bool needsSnapshotSyncIn,
bool skipResetIn);

std::shared_ptr<faabric::BatchExecuteRequest> req;
std::shared_ptr<std::atomic<int>> batchCounter;
int messageIndex = 0;
bool needsSnapshotPush = false;
bool needsSnapshotSync = false;
bool skipReset = false;
};

Expand Down Expand Up @@ -69,7 +69,7 @@ class Executor

void releaseClaim();

virtual faabric::util::SnapshotData snapshot();
virtual faabric::util::MemoryView getMemoryView();

protected:
virtual void restore(faabric::Message& msg);
Expand Down
13 changes: 7 additions & 6 deletions include/faabric/snapshot/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ namespace faabric::snapshot {
// Mocking
// -----------------------------------

std::vector<std::pair<std::string, faabric::util::SnapshotData>>
std::vector<
std::pair<std::string, std::shared_ptr<faabric::util::SnapshotData>>>
getSnapshotPushes();

std::vector<std::pair<std::string, std::vector<faabric::util::SnapshotDiff>>>
Expand All @@ -35,12 +36,12 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient
explicit SnapshotClient(const std::string& hostIn);

void pushSnapshot(const std::string& key,
int32_t groupId,
const faabric::util::SnapshotData& data);
std::shared_ptr<faabric::util::SnapshotData> data);

void pushSnapshotDiffs(std::string snapshotKey,
int32_t groupId,
std::vector<faabric::util::SnapshotDiff> diffs);
void pushSnapshotDiffs(
std::string snapshotKey,
bool force,
const std::vector<faabric::util::SnapshotDiff>& diffs);

void deleteSnapshot(const std::string& key);

Expand Down
18 changes: 8 additions & 10 deletions include/faabric/snapshot/SnapshotRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ class SnapshotRegistry

void mapSnapshot(const std::string& key, uint8_t* target);

void takeSnapshot(const std::string& key,
faabric::util::SnapshotData data,
bool locallyRestorable = true);
void registerSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void takeSnapshotIfNotExists(const std::string& key,
faabric::util::SnapshotData data,
bool locallyRestorable = true);
void registerSnapshotIfNotExists(
const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void deleteSnapshot(const std::string& key);

Expand All @@ -46,10 +45,9 @@ class SnapshotRegistry
int writeSnapshotToFd(const std::string& key,
faabric::util::SnapshotData& data);

void doTakeSnapshot(const std::string& key,
faabric::util::SnapshotData data,
bool locallyRestorable,
bool overwrite);
void doRegisterSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data,
bool overwrite);
};

SnapshotRegistry& getSnapshotRegistry();
Expand Down
5 changes: 0 additions & 5 deletions include/faabric/snapshot/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
public:
SnapshotServer();

// Returns how many diffs have been applied since started, useful for
// testing
size_t diffsApplied() const;

protected:
void doAsyncRecv(int header,
const uint8_t* buffer,
Expand All @@ -40,6 +36,5 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer

private:
faabric::transport::PointToPointBroker& broker;
std::atomic_size_t diffsAppliedCounter = 0;
};
}
8 changes: 4 additions & 4 deletions include/faabric/util/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ int safeCopyToBuffer(const uint8_t* dataIn,
int bufferLen);

template<class T>
T unalignedRead(const std::byte* bytes)
T unalignedRead(const uint8_t* bytes)
{
T value;
std::copy_n(bytes, sizeof(T), reinterpret_cast<std::byte*>(&value));
std::copy_n(bytes, sizeof(T), reinterpret_cast<uint8_t*>(&value));
return value;
}

template<class T>
void unalignedWrite(const T& value, std::byte* destination)
void unalignedWrite(const T& value, uint8_t* destination)
{
std::copy_n(
reinterpret_cast<const std::byte*>(&value), sizeof(T), destination);
reinterpret_cast<const uint8_t*>(&value), sizeof(T), destination);
}

template<class T>
Expand Down
29 changes: 28 additions & 1 deletion include/faabric/util/memory.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#pragma once

#include <cstdint>
#include <functional>
#include <memory>
#include <span>
#include <string>
#include <unistd.h>
#include <vector>

Expand All @@ -22,7 +26,7 @@ struct AlignedChunk

static const long HOST_PAGE_SIZE = sysconf(_SC_PAGESIZE);

bool isPageAligned(void* ptr);
bool isPageAligned(const void* ptr);

size_t getRequiredHostPages(size_t nBytes);

Expand All @@ -41,4 +45,27 @@ std::vector<int> getDirtyPageNumbers(const uint8_t* ptr, int nPages);

std::vector<std::pair<uint32_t, uint32_t>> getDirtyRegions(const uint8_t* ptr,
int nPages);

// -------------------------
// Allocation
// -------------------------
typedef std::unique_ptr<uint8_t[], std::function<void(uint8_t*)>> MemoryRegion;

MemoryRegion allocateSharedMemory(size_t size);

MemoryRegion allocateVirtualMemory(size_t size);

void claimVirtualMemory(std::span<uint8_t> region);

void mapMemoryPrivate(std::span<uint8_t> target, int fd);

void mapMemoryShared(std::span<uint8_t> target, int fd);

void resizeFd(int fd, size_t size);

void writeToFd(int fd, off_t offset, std::span<const uint8_t> data);

int createFd(size_t size, const std::string& fdLabel);

void appendDataToFd(int fd, std::span<uint8_t> data);
}
117 changes: 93 additions & 24 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <span>
#include <string>
#include <vector>

#include <faabric/util/logging.h>
#include <faabric/util/macros.h>
#include <faabric/util/memory.h>

namespace faabric::util {

Expand All @@ -30,28 +33,28 @@ enum SnapshotMergeOperation
class SnapshotDiff
{
public:
const uint8_t* data = nullptr;
size_t size = 0;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
uint32_t offset = 0;

bool noChange = false;

SnapshotDiff() = default;

SnapshotDiff(SnapshotDataType dataTypeIn,
SnapshotMergeOperation operationIn,
uint32_t offsetIn,
const uint8_t* dataIn,
size_t sizeIn)
{
dataType = dataTypeIn;
operation = operationIn;
offset = offsetIn;
data = dataIn;
size = sizeIn;
}
std::span<const uint8_t> dataIn);

SnapshotDataType getDataType() const { return dataType; }

SnapshotMergeOperation getOperation() const { return operation; }

uint32_t getOffset() const { return offset; }

std::span<const uint8_t> getData() const { return data; }

std::vector<uint8_t> getDataCopy() const;

private:
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
uint32_t offset = 0;
std::vector<uint8_t> data;
};

class SnapshotMergeRegion
Expand All @@ -73,27 +76,93 @@ class SnapshotMergeRegion
class SnapshotData
{
public:
size_t size = 0;
uint8_t* data = nullptr;
int fd = 0;

SnapshotData() = default;

std::vector<SnapshotDiff> getDirtyPages();
explicit SnapshotData(size_t sizeIn);

SnapshotData(size_t sizeIn, size_t maxSizeIn);

explicit SnapshotData(std::span<const uint8_t> dataIn);

SnapshotData(std::span<const uint8_t> dataIn, size_t maxSizeIn);

SnapshotData(const SnapshotData&) = delete;

SnapshotData& operator=(const SnapshotData&) = delete;

~SnapshotData();

void copyInData(std::span<const uint8_t> buffer, uint32_t offset = 0);

const uint8_t* getDataPtr(uint32_t offset = 0);

std::vector<uint8_t> getDataCopy();

std::vector<SnapshotDiff> getChangeDiffs(const uint8_t* updated,
size_t updatedSize);
std::vector<uint8_t> getDataCopy(uint32_t offset, size_t dataSize);

void mapToMemory(uint8_t* target);

void addMergeRegion(uint32_t offset,
size_t length,
SnapshotDataType dataType,
SnapshotMergeOperation operation,
bool overwrite = false);

void clearMergeRegions();

std::map<uint32_t, SnapshotMergeRegion> getMergeRegions();

size_t getQueuedDiffsCount();

void queueDiffs(std::span<SnapshotDiff> diffs);

void writeQueuedDiffs();

size_t getSize() const { return size; }

size_t getMaxSize() const { return maxSize; }

private:
size_t size = 0;
size_t maxSize = 0;

int fd = -1;

std::shared_mutex snapMx;

MemoryRegion data = nullptr;

std::vector<SnapshotDiff> queuedDiffs;

// Note - we care about the order of this map, as we iterate through it
// in order of offsets
std::map<uint32_t, SnapshotMergeRegion> mergeRegions;

uint8_t* validatedOffsetPtr(uint32_t offset);

void mapToMemory(uint8_t* target, bool shared);

void writeData(std::span<const uint8_t> buffer, uint32_t offset = 0);
};

class MemoryView
{
public:
// Note - this object is just a view of a section of memory, and does not
// own the underlying data
MemoryView() = default;

explicit MemoryView(std::span<const uint8_t> dataIn);

std::vector<SnapshotDiff> getDirtyRegions();

std::vector<SnapshotDiff> diffWithSnapshot(
std::shared_ptr<SnapshotData> snap);

std::span<const uint8_t> getData() { return data; }

private:
std::span<const uint8_t> data;
};

std::string snapshotDataTypeStr(SnapshotDataType dt);
Expand Down
4 changes: 2 additions & 2 deletions src/flat/faabric.fbs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
table SnapshotPushRequest {
key:string;
groupid:int;
maxSize:ulong;
contents:[ubyte];
}

Expand All @@ -17,7 +17,7 @@ table SnapshotDiffChunk {

table SnapshotDiffPushRequest {
key:string;
groupid:int;
force:bool;
chunks:[SnapshotDiffChunk];
}

Expand Down
Loading