Skip to content

Commit

Permalink
Snapshot and shared memory updates (#548)
Browse files Browse the repository at this point in the history
* Update to new snapshot mechanism

* Fix tests

* Refactoring to new snapshots

* Compiling

* Sync main thread after parallel section

* Adding explicit shared mem regions

* Update to match faabric

* Fixing failing tests

* Sync main thread with snapshot before executing

* Resync all diffs

* Fixing up tests and locking!

* Remove unsupported OpenMP funcs

* Formatting

* Moving on to dist tests

* Allow removing dist test server

* Default shared

* Add covid runner

* Add ignore stacks

* omp tests working

* Latest cpp container

* Update faabric and cpp

* Remove covid runner

* Improve logging on fd call

* Profiling tweaks

* PR comments

* Latest faabric

* Latest faabric
  • Loading branch information
Shillaker committed Dec 30, 2021
1 parent dcad86d commit 11fa8a0
Show file tree
Hide file tree
Showing 27 changed files with 553 additions and 430 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
FAASM_VERSION=0.7.8
FAASM_CLI_IMAGE=faasm/cli:0.7.8

CPP_VERSION=0.1.0
CPP_CLI_IMAGE=faasm/cpp-sysroot:0.1.0
CPP_VERSION=0.1.1
CPP_CLI_IMAGE=faasm/cpp-sysroot:0.1.1

PYTHON_VERSION=0.1.0
PYTHON_CLI_IMAGE=faasm/cpython:0.1.0
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-20.04
container:
image: faasm/cpp-sysroot:0.1.0
image: faasm/cpp-sysroot:0.1.1
defaults:
run:
working-directory: /__w/faasm/faasm/clients/cpp
Expand Down
13 changes: 7 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ set(CMAKE_CXX_EXTENSIONS OFF)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_EXE_LINKER_FLAGS "-fuse-ld=lld")

# Faasm profiling
if (${FAASM_SELF_TRACING})
message("-- Activated FAASM tracing")
add_definitions(-DTRACE_ALL=1)
set(FAABRIC_SELF_TRACING 1)
endif ()

# Ensure all targets can generate readable stacktraces
add_compile_options(-fno-omit-frame-pointer)
add_link_options(-Wl,--export-dynamic)
Expand Down Expand Up @@ -155,12 +162,6 @@ else ()
endfunction()
endif ()

# Faasm profiling
if (${FAASM_SELF_TRACING})
message("-- Activated FAASM tracing")
add_definitions(-DTRACE_ALL=1)
endif ()

# LLVM config
if (${FAASM_PERF_PROFILING})
# In accordance with bin/build_llvm_perf.sh and LLVM version for WAVM
Expand Down
5 changes: 4 additions & 1 deletion deploy/dist-test/dev_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ elif [[ "$1" == "restart" ]]; then
docker-compose restart dist-test-server
elif [[ "$1" == "stop" ]]; then
docker-compose stop dist-test-server
elif [[ "$1" == "rm" ]]; then
docker-compose stop dist-test-server
docker-compose rm dist-test-server
else
echo "Unrecognised argument: $1"
echo ""
echo "Usage:"
echo ""
echo "./deploy/dist-test/dev_server.sh [restart|stop]"
echo "./deploy/dist-test/dev_server.sh [restart|stop|rm]"
exit 1
fi

Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ services:
- REDIS_QUEUE_HOST=redis-queue
- REDIS_STATE_HOST=redis-state
- LD_LIBRARY_PATH=/build/faasm/third-party/lib:/usr/local/lib
- ASAN_OPTIONS=halt_on_error=1
- LSAN_OPTIONS=suppressions=/usr/local/code/faasm/leak-sanitizer-ignorelist.txt
- TSAN_OPTIONS=halt_on_error=1:suppressions=/usr/local/code/faasm/thread-sanitizer-ignorelist.txt:history_size=7:second_deadlock_stack=1
- UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1
volumes:
- ./:/usr/local/code/faasm/
- ./dev/faasm/build/:${FAASM_BUILD_MOUNT}
Expand Down
13 changes: 7 additions & 6 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ inv python.codegen
# Set up cgroup
./bin/cgroup.sh

# If running with sanitisers, set their options
export ASAN_OPTIONS="halt_on_error=1"
export LSAN_OPTIONS="suppressions=/usr/local/code/faasm/leak-sanitizer-ignorelist.txt"
export TSAN_OPTIONS="halt_on_error=1:suppressions=/usr/local/code/faasm/thread-sanitizer-ignorelist.txt:history_size=7:second_deadlock_stack=1"
export UBSAN_OPTIONS="print_stacktrace=1:halt_on_error=1"

# Run the tests
tests
```
Expand Down Expand Up @@ -270,6 +264,13 @@ To start the local development cluster, you can run:
./deploy/local/dev_cluster.sh
```

To run external applications and benchmarks against this cluster, you may also
need to set up your config file to point at this cluster:

```bash
inv knative.ini-file --local
```

### Making changes in your local cluster

Assuming you've changed something related to the `pool_runner` target (which is
Expand Down
2 changes: 1 addition & 1 deletion faabric
Submodule faabric updated 42 files
+0 −258 .github/workflows/sanitisers.yml
+55 −1 .github/workflows/tests.yml
+6 −0 CMakeLists.txt
+2 −0 dist-test/dev_server.sh
+2 −0 dist-test/run.sh
+5 −0 docker-compose.yml
+3 −3 include/faabric/scheduler/Scheduler.h
+16 −6 include/faabric/snapshot/SnapshotClient.h
+2 −14 include/faabric/snapshot/SnapshotRegistry.h
+0 −5 include/faabric/snapshot/SnapshotServer.h
+20 −0 include/faabric/transport/MessageEndpoint.h
+4 −4 include/faabric/util/bytes.h
+30 −1 include/faabric/util/memory.h
+199 −31 include/faabric/util/snapshot.h
+15 −6 src/flat/faabric.fbs
+4 −0 src/runner/FaabricMain.cpp
+60 −33 src/scheduler/Executor.cpp
+3 −1 src/scheduler/MpiWorld.cpp
+28 −16 src/scheduler/Scheduler.cpp
+93 −28 src/snapshot/SnapshotClient.cpp
+12 −114 src/snapshot/SnapshotRegistry.cpp
+50 −104 src/snapshot/SnapshotServer.cpp
+48 −0 src/transport/MessageEndpoint.cpp
+2 −1 src/transport/PointToPointBroker.cpp
+200 −18 src/util/memory.cpp
+618 −205 src/util/snapshot.cpp
+8 −5 tasks/dev.py
+30 −12 tests/dist/DistTestExecutor.cpp
+23 −15 tests/dist/DistTestExecutor.h
+226 −45 tests/dist/scheduler/functions.cpp
+35 −12 tests/dist/scheduler/test_snapshots.cpp
+14 −17 tests/dist/scheduler/test_threads.cpp
+6 −6 tests/dist/transport/functions.cpp
+46 −68 tests/test/scheduler/test_executor.cpp
+46 −21 tests/test/scheduler/test_scheduler.cpp
+168 −107 tests/test/snapshot/test_snapshot_client_server.cpp
+55 −46 tests/test/snapshot/test_snapshot_diffs.cpp
+33 −90 tests/test/snapshot/test_snapshot_registry.cpp
+109 −1 tests/test/transport/test_message_endpoint_client.cpp
+128 −0 tests/test/util/test_memory.cpp
+1,060 −199 tests/test/util/test_snapshot.cpp
+9 −34 tests/utils/fixtures.h
4 changes: 1 addition & 3 deletions include/faaslet/Faaslet.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Faaslet final : public faabric::scheduler::Executor
int msgIdx,
std::shared_ptr<faabric::BatchExecuteRequest> req) override;

faabric::util::SnapshotData snapshot() override;
faabric::util::MemoryView getMemoryView() override;

void restore(faabric::Message& call) override;

Expand All @@ -35,8 +35,6 @@ class Faaslet final : public faabric::scheduler::Executor
void postFinish() override;

private:
bool isIsolated = false;

std::string localResetSnapshotKey;

std::shared_ptr<isolation::NetworkNamespace> ns;
Expand Down
2 changes: 2 additions & 0 deletions include/wamr/WAMRWasmModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class WAMRWasmModule final : public WasmModule

size_t getMemorySizeBytes() override;

uint8_t* getMemoryBase() override;

size_t getMaxMemoryPages();

WASMModuleInstanceCommon* getModuleInstance();
Expand Down
28 changes: 15 additions & 13 deletions include/wasm/WasmModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#define WASM_CTORS_FUNC_NAME "__wasm_call_ctors"
#define ENTRY_FUNC_NAME "_start"

#define MAX_WASM_MEM (1024L * 1024L * 1024L * 4L)

namespace wasm {

// Note - avoid a zero default on the thread request type otherwise it can
Expand Down Expand Up @@ -126,33 +128,34 @@ class WasmModule
virtual uint8_t* getMemoryBase();

// ----- Snapshot/ restore -----
faabric::util::SnapshotData getSnapshotData();
std::shared_ptr<faabric::util::SnapshotData> getSnapshotData();

faabric::util::MemoryView getMemoryView();

std::string createAppSnapshot(const faabric::Message& msg);
std::string getOrCreateAppSnapshot(const faabric::Message& msg,
bool update);

void deleteAppSnapshot(const faabric::Message& msg);

std::string snapshot(bool locallyRestorable = true);

void syncAppSnapshot(const faabric::Message& msg);

void restore(const std::string& snapshotKey);

// ----- Threading -----
void queuePthreadCall(threads::PthreadCall call);

int awaitPthreadCall(const faabric::Message* msg, int pthreadPtr);

void setUpOpenMPMergeRegions(const faabric::Message& msg,
std::shared_ptr<threads::Level> ompLevel);

void setUpPthreadMergeRegions(const faabric::Message& msg,
std::shared_ptr<threads::Level> ompLevel);

std::vector<uint32_t> getThreadStacks();

// ----- Debugging -----
virtual void printDebugInfo();

protected:
std::shared_mutex moduleMutex;

std::atomic<uint32_t> currentBrk = 0;

std::string boundUser;
Expand All @@ -169,10 +172,6 @@ class WasmModule
int threadPoolSize = 0;
std::vector<uint32_t> threadStacks;

std::shared_mutex moduleMemoryMutex;
std::mutex modulePthreadsMutex;
std::mutex moduleStateMutex;

// Argc/argv
unsigned int argc;
std::vector<std::string> argv;
Expand All @@ -183,6 +182,7 @@ class WasmModule
std::unordered_map<int32_t, uint32_t> pthreadPtrsToChainedCalls;

// Shared memory regions
std::shared_mutex sharedMemWasmPtrsMutex;
std::unordered_map<std::string, uint32_t> sharedMemWasmPtrs;

int getStdoutFd();
Expand All @@ -193,7 +193,9 @@ class WasmModule
virtual void doBindToFunction(faabric::Message& msg, bool cache);

// Snapshots
void snapshotWithKey(const std::string& snapKey, bool locallyRestorable);
void snapshotWithKey(const std::string& snapKey);

void ignoreThreadStacksInSnapshot(const std::string& snapKey);

// Threads
void createThreadStacks();
Expand Down
11 changes: 6 additions & 5 deletions include/wavm/WAVMWasmModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ std::vector<uint8_t> wavmCodegen(std::vector<uint8_t>& wasmBytes,
template<class T>
T unalignedWavmRead(WAVM::Runtime::Memory* memory, WAVM::Uptr offset)
{
const std::byte* bytes =
WAVM::Runtime::memoryArrayPtr<std::byte>(memory, offset, sizeof(T));
const uint8_t* bytes =
WAVM::Runtime::memoryArrayPtr<uint8_t>(memory, offset, sizeof(T));
return faabric::util::unalignedRead<T>(bytes);
}

Expand All @@ -34,8 +34,8 @@ void unalignedWavmWrite(const T& value,
WAVM::Runtime::Memory* memory,
WAVM::Uptr offset)
{
std::byte* bytes =
WAVM::Runtime::memoryArrayPtr<std::byte>(memory, offset, sizeof(T));
uint8_t* bytes =
WAVM::Runtime::memoryArrayPtr<uint8_t>(memory, offset, sizeof(T));
faabric::util::unalignedWrite<T>(value, bytes);
}

Expand Down Expand Up @@ -232,7 +232,8 @@ class WAVMModuleCache
std::pair<wasm::WAVMWasmModule&, faabric::util::SharedLock> getCachedModule(
faabric::Message& msg);

void initialiseCachedModule(faabric::Message& msg);
std::string registerResetSnapshot(wasm::WasmModule& module,
faabric::Message& msg);

void clear();

Expand Down
26 changes: 12 additions & 14 deletions src/faaslet/Faaslet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <faabric/util/config.h>
#include <faabric/util/environment.h>
#include <faabric/util/func.h>
#include <faabric/util/gids.h>
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>
#include <faabric/util/timing.h>
Expand All @@ -26,6 +27,8 @@
#include <storage/FileSystem.h>
#endif

static thread_local bool threadIsIsolated = false;

using namespace isolation;

namespace faaslet {
Expand Down Expand Up @@ -84,12 +87,7 @@ Faaslet::Faaslet(faabric::Message& msg)
// (currently only supported in WAVM)
if (conf.wasmVm == "wavm") {
localResetSnapshotKey =
faabric::util::funcToString(msg, false) + "_reset";
faabric::util::SnapshotData snapData = module->getSnapshotData();

faabric::snapshot::SnapshotRegistry& snapReg =
faabric::snapshot::getSnapshotRegistry();
snapReg.takeSnapshotIfNotExists(localResetSnapshotKey, snapData, true);
wasm::getWAVMModuleCache().registerResetSnapshot(*module, msg);
}
}

Expand All @@ -100,7 +98,11 @@ int32_t Faaslet::executeTask(int threadPoolIdx,
// Lazily setup Faaslet isolation.
// This has to be done within the same thread as the execution (hence we
// leave it until just before execution).
if (!isIsolated) {
// Because this is a thread-specific operation we don't need any
// synchronisation here, and rely on the cgroup and network namespace
// operations being thread-safe.

if (!threadIsIsolated) {
// Add this thread to the cgroup
CGroup cgroup(BASE_CGROUP_NAME);
cgroup.addCurrentThread();
Expand All @@ -109,7 +111,7 @@ int32_t Faaslet::executeTask(int threadPoolIdx,
ns = claimNetworkNamespace();
ns->addCurrentThread();

isIsolated = true;
threadIsIsolated = true;
}

int32_t returnValue = module->executeTask(threadPoolIdx, msgIdx, req);
Expand All @@ -130,9 +132,9 @@ void Faaslet::postFinish()
}
}

faabric::util::SnapshotData Faaslet::snapshot()
faabric::util::MemoryView Faaslet::getMemoryView()
{
return module->getSnapshotData();
return module->getMemoryView();
}

void Faaslet::restore(faabric::Message& msg)
Expand All @@ -143,15 +145,11 @@ void Faaslet::restore(faabric::Message& msg)
// Restore from snapshot if necessary
if (conf.wasmVm == "wavm") {
if (!snapshotKey.empty() && !msg.issgx()) {
PROF_START(snapshotOverride)

SPDLOG_DEBUG("Restoring {} from snapshot {} before execution",
id,
snapshotKey);

module->restore(snapshotKey);

PROF_END(snapshotOverride)
}
}
}
Expand Down
19 changes: 12 additions & 7 deletions src/runner/func_runner.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "faabric/util/environment.h"
#include <conf/FaasmConfig.h>
#include <faaslet/Faaslet.h>
#include <storage/FileLoader.h>
Expand Down Expand Up @@ -47,13 +48,13 @@ int doRunner(int argc, char* argv[])
faabric::util::SystemConfig& conf = faabric::util::getSystemConfig();
conf::FaasmConfig& faasmConf = conf::getFaasmConfig();

// Set short timeouts to die quickly
conf.boundTimeout = 60000;
conf.globalMessageTimeout = 60000;
faasmConf.chainedCallTimeout = 60000;
// Set timeout to ensure longer functions can finish
conf.boundTimeout = 120000;
conf.globalMessageTimeout = 120000;
faasmConf.chainedCallTimeout = 120000;

// Make sure we have enough space for chained calls
int nThreads = 10;
int nThreads = std::min<int>(faabric::util::getUsableCores(), 10);
faabric::HostResources res;
res.set_slots(nThreads);
faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler();
Expand Down Expand Up @@ -98,7 +99,7 @@ int doRunner(int argc, char* argv[])
m.startRunner();

// Submit the invocation
PROF_START(roundTrip)
PROF_START(FunctionExec)
sch.callFunctions(req);

// Await the result
Expand All @@ -109,7 +110,7 @@ int doRunner(int argc, char* argv[])
throw std::runtime_error("Executing function failed");
}

PROF_END(roundTrip)
PROF_END(FunctionExec)

m.shutdown();

Expand All @@ -121,10 +122,14 @@ int main(int argc, char* argv[])
storage::initFaasmS3();
faabric::transport::initGlobalMessageContext();

PROF_BEGIN

// WARNING: All 0MQ-related operations must take place in a self-contined
// scope to ensure all sockets are destructed before closing the context.
int result = doRunner(argc, argv);

PROF_SUMMARY

faabric::transport::closeGlobalMessageContext();
storage::shutdownFaasmS3();
return result;
Expand Down
Loading

0 comments on commit 11fa8a0

Please sign in to comment.