Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move snapshot client/ server into snapshot module #125

Merged
merged 4 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/faabric/runner/FaabricMain.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/scheduler/FunctionCallServer.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotServer.h>
#include <faabric/snapshot/SnapshotServer.h>
#include <faabric/state/StateServer.h>
#include <faabric/util/config.h>

Expand All @@ -28,6 +28,6 @@ class FaabricMain
private:
faabric::state::StateServer stateServer;
faabric::scheduler::FunctionCallServer functionServer;
faabric::scheduler::SnapshotServer snapshotServer;
faabric::snapshot::SnapshotServer snapshotServer;
};
}
4 changes: 2 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <faabric/scheduler/ExecGraph.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/util/config.h>
#include <faabric/util/func.h>
#include <faabric/util/queue.h>
Expand Down Expand Up @@ -187,7 +187,7 @@ class Scheduler
faabric::scheduler::FunctionCallClient& getFunctionCallClient(
const std::string& otherHost);

faabric::scheduler::SnapshotClient& getSnapshotClient(
faabric::snapshot::SnapshotClient& getSnapshotClient(
const std::string& otherHost);

faabric::HostResources thisHostResources;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

namespace faabric::scheduler {
namespace faabric::snapshot {
enum SnapshotCalls
{
NoSnapshotCall = 0,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#pragma once

#include <faabric/flat/faabric_generated.h>
#include <faabric/scheduler/SnapshotApi.h>
#include <faabric/snapshot/SnapshotApi.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/snapshot.h>

namespace faabric::scheduler {
namespace faabric::snapshot {

// -----------------------------------
// Mocking
Expand Down Expand Up @@ -57,6 +57,6 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient
const std::vector<faabric::util::SnapshotDiff>& diffs);

private:
void sendHeader(faabric::scheduler::SnapshotCalls call);
void sendHeader(faabric::snapshot::SnapshotCalls call);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

#include <faabric/flat/faabric_generated.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotApi.h>
#include <faabric/snapshot/SnapshotApi.h>
#include <faabric/transport/MessageEndpointServer.h>

namespace faabric::scheduler {
namespace faabric::snapshot {
class SnapshotServer final : public faabric::transport::MessageEndpointServer
{
public:
Expand Down
4 changes: 1 addition & 3 deletions src/scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ set(LIB_FILES
FunctionCallClient.cpp
FunctionCallServer.cpp
Scheduler.cpp
SnapshotServer.cpp
SnapshotClient.cpp
MpiContext.cpp
MpiMessageBuffer.cpp
MpiWorldRegistry.cpp
Expand All @@ -18,4 +16,4 @@ set(LIB_FILES

faabric_lib(scheduler "${LIB_FILES}")

target_link_libraries(scheduler flat proto snapshot state faabricmpi redis transport)
target_link_libraries(scheduler snapshot state faabricmpi redis)
5 changes: 3 additions & 2 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/util/environment.h>
#include <faabric/util/func.h>
Expand All @@ -19,6 +19,7 @@
#define FLUSH_TIMEOUT_MS 10000

using namespace faabric::util;
using namespace faabric::snapshot;

namespace faabric::scheduler {

Expand All @@ -31,7 +32,7 @@ static thread_local std::unordered_map<std::string,
functionCallClients;

static thread_local std::unordered_map<std::string,
faabric::scheduler::SnapshotClient>
faabric::snapshot::SnapshotClient>
snapshotClients;

Scheduler& getScheduler()
Expand Down
4 changes: 3 additions & 1 deletion src/snapshot/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
file(GLOB HEADERS "${FAABRIC_INCLUDE_DIR}/faabric/snapshot/*.h")

set(LIB_FILES
SnapshotClient.cpp
SnapshotRegistry.cpp
SnapshotServer.cpp
${HEADERS}
)

faabric_lib(snapshot "${LIB_FILES}")

target_link_libraries(snapshot proto util)
target_link_libraries(snapshot proto flat transport util)
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/transport/common.h>
#include <faabric/transport/macros.h>
#include <faabric/util/config.h>
#include <faabric/util/logging.h>
#include <faabric/util/queue.h>
#include <faabric/util/testing.h>

namespace faabric::scheduler {
namespace faabric::snapshot {

// -----------------------------------
// Mocking
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <faabric/flat/faabric_generated.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/SnapshotServer.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/snapshot/SnapshotServer.h>
#include <faabric/state/State.h>
#include <faabric/transport/common.h>
#include <faabric/transport/macros.h>
Expand All @@ -10,7 +10,7 @@

#include <sys/mman.h>

namespace faabric::scheduler {
namespace faabric::snapshot {
SnapshotServer::SnapshotServer()
: faabric::transport::MessageEndpointServer(SNAPSHOT_ASYNC_PORT,
SNAPSHOT_SYNC_PORT)
Expand All @@ -21,11 +21,11 @@ void SnapshotServer::doAsyncRecv(int header,
size_t bufferSize)
{
switch (header) {
case faabric::scheduler::SnapshotCalls::DeleteSnapshot: {
case faabric::snapshot::SnapshotCalls::DeleteSnapshot: {
this->recvDeleteSnapshot(buffer, bufferSize);
break;
}
case faabric::scheduler::SnapshotCalls::ThreadResult: {
case faabric::snapshot::SnapshotCalls::ThreadResult: {
this->recvThreadResult(buffer, bufferSize);
break;
}
Expand All @@ -40,10 +40,10 @@ std::unique_ptr<google::protobuf::Message>
SnapshotServer::doSyncRecv(int header, const uint8_t* buffer, size_t bufferSize)
{
switch (header) {
case faabric::scheduler::SnapshotCalls::PushSnapshot: {
case faabric::snapshot::SnapshotCalls::PushSnapshot: {
return recvPushSnapshot(buffer, bufferSize);
}
case faabric::scheduler::SnapshotCalls::PushSnapshotDiffs: {
case faabric::snapshot::SnapshotCalls::PushSnapshotDiffs: {
return recvPushSnapshotDiffs(buffer, bufferSize);
}
default: {
Expand Down
2 changes: 1 addition & 1 deletion tests/dist/scheduler/test_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/util/bytes.h>
#include <faabric/util/config.h>
Expand Down
25 changes: 12 additions & 13 deletions tests/test/scheduler/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/util/config.h>
#include <faabric/util/func.h>
Expand Down Expand Up @@ -468,7 +468,7 @@ TEST_CASE_METHOD(TestExecutorFixture,
REQUIRE(actualHost == otherHost);

// Check the snapshot has been pushed to the other host
auto snapPushes = faabric::scheduler::getSnapshotPushes();
auto snapPushes = faabric::snapshot::getSnapshotPushes();
REQUIRE(snapPushes.size() == 1);
REQUIRE(snapPushes.at(0).first == otherHost);

Expand All @@ -484,7 +484,7 @@ TEST_CASE_METHOD(TestExecutorFixture,
REQUIRE(restoreCount == 1);

// Process the thread result requests
auto results = faabric::scheduler::getThreadResults();
auto results = faabric::snapshot::getThreadResults();

for (auto& r : results) {
REQUIRE(r.first == thisHost);
Expand Down Expand Up @@ -524,8 +524,8 @@ TEST_CASE_METHOD(TestExecutorFixture,

// Note that because the results don't actually get logged on this host, we
// can't wait on them as usual.
auto actual = faabric::scheduler::getThreadResults();
REQUIRE_RETRY(actual = faabric::scheduler::getThreadResults(),
auto actual = faabric::snapshot::getThreadResults();
REQUIRE_RETRY(actual = faabric::snapshot::getThreadResults(),
actual.size() == nThreads);

std::vector<uint32_t> actualMessageIds;
Expand Down Expand Up @@ -714,11 +714,10 @@ TEST_CASE_METHOD(TestExecutorFixture,

// Results aren't set on this host as it's not the master, so we have to
// wait
REQUIRE_RETRY({},
faabric::scheduler::getThreadResults().size() == nThreads);
REQUIRE_RETRY({}, faabric::snapshot::getThreadResults().size() == nThreads);

// Check results have been sent back to the master host
auto actualResults = faabric::scheduler::getThreadResults();
auto actualResults = faabric::snapshot::getThreadResults();
REQUIRE(actualResults.size() == nThreads);

// Check only one has diffs attached
Expand Down Expand Up @@ -814,18 +813,18 @@ TEST_CASE_METHOD(TestExecutorFixture,
REQUIRE(sch.getFunctionRegisteredHosts(msg) == expectedRegistered);

// Check snapshot has been pushed
auto pushes = faabric::scheduler::getSnapshotPushes();
auto pushes = faabric::snapshot::getSnapshotPushes();
REQUIRE(pushes.at(0).first == otherHost);
REQUIRE(pushes.at(0).second.size == snapshotSize);

REQUIRE(faabric::scheduler::getSnapshotDiffPushes().empty());
REQUIRE(faabric::snapshot::getSnapshotDiffPushes().empty());

// Check that we're not registering any dirty pages on the snapshot
faabric::util::SnapshotData& snap = reg.getSnapshot(snapshotKey);
REQUIRE(snap.getDirtyPages().empty());

// Now reset snapshot pushes of all kinds
faabric::scheduler::clearMockSnapshotRequests();
faabric::snapshot::clearMockSnapshotRequests();

// Make an edit to the snapshot memory and get the expected diffs
snap.data[0] = 9;
Expand All @@ -850,10 +849,10 @@ TEST_CASE_METHOD(TestExecutorFixture,
sch.awaitThreadResult(reqB->mutable_messages()->at(0).id());

// Check the full snapshot hasn't been pushed
REQUIRE(faabric::scheduler::getSnapshotPushes().empty());
REQUIRE(faabric::snapshot::getSnapshotPushes().empty());

// Check the diffs are pushed as expected
auto diffPushes = faabric::scheduler::getSnapshotDiffPushes();
auto diffPushes = faabric::snapshot::getSnapshotDiffPushes();
REQUIRE(diffPushes.size() == 1);
REQUIRE(diffPushes.at(0).first == otherHost);
std::vector<faabric::util::SnapshotDiff> actualDiffs =
Expand Down
8 changes: 4 additions & 4 deletions tests/test/scheduler/test_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/util/environment.h>
#include <faabric/util/func.h>
Expand Down Expand Up @@ -272,7 +272,7 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]")
REQUIRE(resRequestsOne.at(0).first == otherHost);

// Check snapshots have been pushed
auto snapshotPushes = faabric::scheduler::getSnapshotPushes();
auto snapshotPushes = faabric::snapshot::getSnapshotPushes();
if (expectedSnapshot.empty()) {
REQUIRE(snapshotPushes.empty());
} else {
Expand Down Expand Up @@ -749,7 +749,7 @@ TEST_CASE_METHOD(SlowExecutorFixture,
for (auto h : expectedHosts) {
expectedDeleteRequests.push_back({ h, snapKey });
};
auto actualDeleteRequests = faabric::scheduler::getSnapshotDeletes();
auto actualDeleteRequests = faabric::snapshot::getSnapshotDeletes();

REQUIRE(actualDeleteRequests == expectedDeleteRequests);
}
Expand Down Expand Up @@ -791,7 +791,7 @@ TEST_CASE_METHOD(SlowExecutorFixture,
}

// Check the results have been pushed along with the thread result
auto actualResults = faabric::scheduler::getThreadResults();
auto actualResults = faabric::snapshot::getThreadResults();

REQUIRE(actualResults.size() == 1);
REQUIRE(actualResults.at(0).first == "otherHost");
Expand Down
8 changes: 4 additions & 4 deletions tests/test/scheduler/test_snapshot_client_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

#include <sys/mman.h>

#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/scheduler/SnapshotServer.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/snapshot/SnapshotServer.h>
#include <faabric/util/config.h>
#include <faabric/util/environment.h>
#include <faabric/util/gids.h>
Expand All @@ -22,8 +22,8 @@ class SnapshotClientServerFixture
, public SnapshotTestFixture
{
protected:
faabric::scheduler::SnapshotServer server;
faabric::scheduler::SnapshotClient cli;
faabric::snapshot::SnapshotServer server;
faabric::snapshot::SnapshotClient cli;

public:
SnapshotClientServerFixture()
Expand Down
4 changes: 2 additions & 2 deletions tests/utils/fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class SchedulerTestFixture
faabric::util::setTestMode(true);

faabric::scheduler::clearMockRequests();
faabric::scheduler::clearMockSnapshotRequests();
faabric::snapshot::clearMockSnapshotRequests();

sch.shutdown();
sch.addHostToGlobalSet();
Expand All @@ -80,7 +80,7 @@ class SchedulerTestFixture
faabric::util::setTestMode(true);

faabric::scheduler::clearMockRequests();
faabric::scheduler::clearMockSnapshotRequests();
faabric::snapshot::clearMockSnapshotRequests();

sch.shutdown();
sch.addHostToGlobalSet();
Expand Down
4 changes: 2 additions & 2 deletions tests/utils/system_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/MpiWorldRegistry.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/state/State.h>
#include <faabric/util/testing.h>
Expand Down Expand Up @@ -50,7 +50,7 @@ void cleanFaabric()
faabric::util::setTestMode(true);
faabric::util::setMockMode(false);
faabric::scheduler::clearMockRequests();
faabric::scheduler::clearMockSnapshotRequests();
faabric::snapshot::clearMockSnapshotRequests();

// Set up dummy executor factory
std::shared_ptr<faabric::scheduler::ExecutorFactory> fac =
Expand Down