diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index d721adf03..0b1b41e74 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -99,7 +100,7 @@ class Scheduler void callFunction(faabric::Message& msg, bool forceLocal = false); - std::vector callFunctions( + faabric::util::SchedulingDecision callFunctions( std::shared_ptr req, bool forceLocal = false); @@ -233,7 +234,7 @@ class Scheduler int scheduleFunctionsOnHost( const std::string& host, std::shared_ptr req, - std::vector& records, + faabric::util::SchedulingDecision& decision, int offset, faabric::util::SnapshotData* snapshot); }; diff --git a/include/faabric/transport/PointToPointBroker.h b/include/faabric/transport/PointToPointBroker.h index 19717867e..ebdd1866f 100644 --- a/include/faabric/transport/PointToPointBroker.h +++ b/include/faabric/transport/PointToPointBroker.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -17,11 +18,13 @@ class PointToPointBroker std::string getHostForReceiver(int appId, int recvIdx); - void setHostForReceiver(int appId, int recvIdx, const std::string& host); + std::set setUpLocalMappingsFromSchedulingDecision( + const faabric::util::SchedulingDecision& decision); - void broadcastMappings(int appId); + void setAndSendMappingsFromSchedulingDecision( + const faabric::util::SchedulingDecision& decision); - void sendMappings(int appId, const std::string& host); + void waitForMappingsOnThisHost(int appId); std::set getIdxsRegisteredForApp(int appId); @@ -43,6 +46,10 @@ class PointToPointBroker std::unordered_map> appIdxs; std::unordered_map mappings; + std::unordered_map appMappingsFlags; + std::unordered_map appMappingMutexes; + std::unordered_map appMappingCvs; + std::shared_ptr getClient(const std::string& host); faabric::scheduler::Scheduler& sch; diff --git a/include/faabric/util/scheduling.h b/include/faabric/util/scheduling.h new file mode 100644 index 000000000..670067345 --- /dev/null +++ b/include/faabric/util/scheduling.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include +#include + +#include + +namespace faabric::util { + +class SchedulingDecision +{ + public: + static SchedulingDecision fromPointToPointMappings( + faabric::PointToPointMappings& mappings); + + SchedulingDecision(uint32_t appIdIn); + + uint32_t appId = 0; + + int32_t nFunctions = 0; + + std::vector messageIds; + + std::vector hosts; + + std::vector appIdxs; + + std::string returnHost; + + void addMessage(const std::string& host, const faabric::Message& msg); + + void addMessage(const std::string& host, int32_t messageId, int32_t appIdx); +}; +} diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 999570e61..026ad14f5 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -214,10 +214,13 @@ message PointToPointMessage { } message PointToPointMappings { + int32 appId = 1; + message PointToPointMapping { - int32 appId = 1; - int32 recvIdx = 2; - string host = 3; + string host = 1; + int32 messageId = 2; + int32 recvIdx = 3; } - repeated PointToPointMapping mappings = 1; + + repeated PointToPointMapping mappings = 2; } diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index 83d5223c3..79908a3ae 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include // Each MPI rank runs in a separate thread, thus we use TLS to maintain the @@ -199,7 +200,8 @@ void MpiWorld::create(const faabric::Message& call, int newId, int newSize) std::vector executedAt; if (size > 1) { // Send the init messages (note that message i corresponds to rank i+1) - executedAt = sch.callFunctions(req); + faabric::util::SchedulingDecision decision = sch.callFunctions(req); + executedAt = decision.hosts; } assert(executedAt.size() == size - 1); diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index bc341c931..e65267cd9 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -202,14 +203,13 @@ void Scheduler::notifyExecutorShutdown(Executor* exec, } } -std::vector Scheduler::callFunctions( +faabric::util::SchedulingDecision Scheduler::callFunctions( std::shared_ptr req, bool forceLocal) { // Extract properties of the request int nMessages = req->messages_size(); bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; - std::vector executed(nMessages); // Note, we assume all the messages are for the same function and have the // same master host @@ -222,6 +222,9 @@ std::vector Scheduler::callFunctions( throw std::runtime_error("Message with no master host"); } + // Set up scheduling decision + SchedulingDecision decision(firstMsg.appid()); + // TODO - more granular locking, this is incredibly conservative faabric::util::FullLock lock(mx); @@ -233,14 +236,15 @@ std::vector Scheduler::callFunctions( "Forwarding {} {} back to master {}", nMessages, funcStr, masterHost); getFunctionCallClient(masterHost).executeFunctions(req); - return executed; + decision.returnHost = masterHost; + return decision; } if (forceLocal) { // We're forced to execute locally here so we do all the messages for (int i = 0; i < nMessages; i++) { localMessageIdxs.emplace_back(i); - executed.at(i) = thisHost; + decision.addMessage(thisHost, req->messages().at(i)); } } else { // At this point we know we're the master host, and we've not been @@ -315,7 +319,7 @@ std::vector Scheduler::callFunctions( "Executing {}/{} {} locally", nLocally, nMessages, funcStr); for (int i = 0; i < nLocally; i++) { localMessageIdxs.emplace_back(i); - executed.at(i) = thisHost; + decision.addMessage(thisHost, req->messages().at(i)); } } @@ -325,7 +329,7 @@ std::vector Scheduler::callFunctions( // Schedule first to already registered hosts for (const auto& h : thisRegisteredHosts) { int nOnThisHost = scheduleFunctionsOnHost( - h, req, executed, offset, &snapshotData); + h, req, decision, offset, &snapshotData); offset += nOnThisHost; if (offset >= nMessages) { @@ -347,7 +351,7 @@ std::vector Scheduler::callFunctions( // Schedule functions on the host int nOnThisHost = scheduleFunctionsOnHost( - h, req, executed, offset, &snapshotData); + h, req, decision, offset, &snapshotData); // Register the host if it's exected a function if (nOnThisHost > 0) { @@ -372,7 +376,7 @@ std::vector Scheduler::callFunctions( for (; offset < nMessages; offset++) { localMessageIdxs.emplace_back(offset); - executed.at(offset) = thisHost; + decision.addMessage(thisHost, req->messages().at(offset)); } } @@ -438,7 +442,7 @@ std::vector Scheduler::callFunctions( // Records for tests if (faabric::util::isTestMode()) { for (int i = 0; i < nMessages; i++) { - std::string executedHost = executed.at(i); + std::string executedHost = decision.hosts.at(i); faabric::Message msg = req->messages().at(i); // Log results if in test mode @@ -451,7 +455,7 @@ std::vector Scheduler::callFunctions( } } - return executed; + return decision; } std::vector Scheduler::getUnregisteredHosts( @@ -498,7 +502,7 @@ void Scheduler::broadcastSnapshotDelete(const faabric::Message& msg, int Scheduler::scheduleFunctionsOnHost( const std::string& host, std::shared_ptr req, - std::vector& records, + SchedulingDecision& decision, int offset, faabric::util::SnapshotData* snapshot) { @@ -532,7 +536,7 @@ int Scheduler::scheduleFunctionsOnHost( auto* newMsg = hostRequest->add_messages(); *newMsg = req->messages().at(i); newMsg->set_executeslocally(false); - records.at(i) = host; + decision.addMessage(host, req->messages().at(i)); } SPDLOG_DEBUG( diff --git a/src/transport/PointToPointBroker.cpp b/src/transport/PointToPointBroker.cpp index b5bd600df..5a9b35f60 100644 --- a/src/transport/PointToPointBroker.cpp +++ b/src/transport/PointToPointBroker.cpp @@ -6,6 +6,8 @@ #include #include +#define MAPPING_TIMEOUT_MS 10000 + namespace faabric::transport { // NOTE: Keeping 0MQ sockets in TLS is usually a bad idea, as they _must_ be @@ -55,68 +57,106 @@ std::string PointToPointBroker::getHostForReceiver(int appId, int recvIdx) return mappings[key]; } -void PointToPointBroker::setHostForReceiver(int appId, - int recvIdx, - const std::string& host) +std::set +PointToPointBroker::setUpLocalMappingsFromSchedulingDecision( + const faabric::util::SchedulingDecision& decision) { - faabric::util::FullLock lock(brokerMutex); + int appId = decision.appId; + std::set hosts; - SPDLOG_TRACE( - "Setting point-to-point mapping {}:{} to {}", appId, recvIdx, host); + { + faabric::util::FullLock lock(brokerMutex); - // Record this index for this app - appIdxs[appId].insert(recvIdx); + // Set up the mappings + for (int i = 0; i < decision.nFunctions; i++) { + int recvIdx = decision.appIdxs.at(i); + const std::string& host = decision.hosts.at(i); - // Add host mapping - std::string key = getPointToPointKey(appId, recvIdx); - mappings[key] = host; -} + SPDLOG_DEBUG("Setting point-to-point mapping {}:{} to {}", + appId, + recvIdx, + host); -void PointToPointBroker::broadcastMappings(int appId) -{ - auto& sch = faabric::scheduler::getScheduler(); + // Record this index for this app + appIdxs[appId].insert(recvIdx); - // TODO seems excessive to broadcast to all hosts, could we perhaps use the - // set of registered hosts? - std::set hosts = sch.getAvailableHosts(); + // Add host mapping + std::string key = getPointToPointKey(appId, recvIdx); + mappings[key] = host; - faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); - - for (const auto& host : hosts) { - // Skip this host - if (host == conf.endpointHost) { - continue; + // If it's not this host, add to set of returned hosts + if (host != faabric::util::getSystemConfig().endpointHost) { + hosts.insert(host); + } } + } + + { + // Lock this app + std::unique_lock lock(appMappingMutexes[appId]); - sendMappings(appId, host); + // Enable the app + appMappingsFlags[appId] = true; + + // Notify waiters + appMappingCvs[appId].notify_all(); } + + return hosts; } -void PointToPointBroker::sendMappings(int appId, const std::string& host) +void PointToPointBroker::setAndSendMappingsFromSchedulingDecision( + const faabric::util::SchedulingDecision& decision) { - faabric::util::SharedLock lock(brokerMutex); + int appId = decision.appId; - faabric::PointToPointMappings msg; + // Set up locally + std::set otherHosts = + setUpLocalMappingsFromSchedulingDecision(decision); - std::set& indexes = appIdxs[appId]; + // Send out to other hosts + for (const auto& host : otherHosts) { + faabric::PointToPointMappings msg; + msg.set_appid(appId); + + std::set& indexes = appIdxs[appId]; - for (auto i : indexes) { - std::string key = getPointToPointKey(appId, i); - std::string host = mappings[key]; + for (int i = 0; i < decision.nFunctions; i++) { + auto* mapping = msg.add_mappings(); + mapping->set_host(decision.hosts.at(i)); + mapping->set_messageid(decision.messageIds.at(i)); + mapping->set_recvidx(decision.appIdxs.at(i)); + } - auto* mapping = msg.add_mappings(); - mapping->set_appid(appId); - mapping->set_recvidx(i); - mapping->set_host(host); + SPDLOG_DEBUG("Sending {} point-to-point mappings for {} to {}", + indexes.size(), + appId, + host); + + auto cli = getClient(host); + cli->sendMappings(msg); } +} + +void PointToPointBroker::waitForMappingsOnThisHost(int appId) +{ + // Check if it's been enabled + if (!appMappingsFlags[appId]) { + // Lock this app + std::unique_lock lock(appMappingMutexes[appId]); + + // Wait for app to be enabled + auto timePoint = std::chrono::system_clock::now() + + std::chrono::milliseconds(MAPPING_TIMEOUT_MS); - SPDLOG_DEBUG("Sending {} point-to-point mappings for {} to {}", - indexes.size(), - appId, - host); + if (!appMappingCvs[appId].wait_until(lock, timePoint, [this, appId] { + return appMappingsFlags[appId]; + })) { - auto cli = getClient(host); - cli->sendMappings(msg); + SPDLOG_ERROR("Timed out waiting for app mappings {}", appId); + throw std::runtime_error("Timed out waiting for app mappings"); + } + } } std::set PointToPointBroker::getIdxsRegisteredForApp(int appId) @@ -131,6 +171,8 @@ void PointToPointBroker::sendMessage(int appId, const uint8_t* buffer, size_t bufferSize) { + waitForMappingsOnThisHost(appId); + std::string host = getHostForReceiver(appId, recvIdx); if (host == faabric::util::getSystemConfig().endpointHost) { @@ -212,6 +254,10 @@ void PointToPointBroker::clear() appIdxs.clear(); mappings.clear(); + + appMappingMutexes.clear(); + appMappingsFlags.clear(); + appMappingCvs.clear(); } void PointToPointBroker::resetThreadLocalCache() diff --git a/src/transport/PointToPointServer.cpp b/src/transport/PointToPointServer.cpp index 581167a0c..d037bc67f 100644 --- a/src/transport/PointToPointServer.cpp +++ b/src/transport/PointToPointServer.cpp @@ -64,9 +64,10 @@ std::unique_ptr PointToPointServer::doRecvMappings( { PARSE_MSG(faabric::PointToPointMappings, buffer, bufferSize) - for (const auto& m : msg.mappings()) { - reg.setHostForReceiver(m.appid(), m.recvidx(), m.host()); - } + faabric::util::SchedulingDecision decision = + faabric::util::SchedulingDecision::fromPointToPointMappings(msg); + + reg.setUpLocalMappingsFromSchedulingDecision(decision); return std::make_unique(); } diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index f42be3fab..dd93029fc 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -20,6 +20,7 @@ set(LIB_FILES network.cpp queue.cpp random.cpp + scheduling.cpp snapshot.cpp state.cpp string_tools.cpp diff --git a/src/util/scheduling.cpp b/src/util/scheduling.cpp new file mode 100644 index 000000000..42c8e1bed --- /dev/null +++ b/src/util/scheduling.cpp @@ -0,0 +1,36 @@ +#include + +namespace faabric::util { +SchedulingDecision::SchedulingDecision(uint32_t appIdIn) + : appId(appIdIn) +{} + +void SchedulingDecision::addMessage(const std::string& host, + const faabric::Message& msg) +{ + addMessage(host, msg.id(), msg.appindex()); +} + +void SchedulingDecision::addMessage(const std::string& host, + int32_t messageId, + int32_t appIdx) +{ + nFunctions++; + + hosts.emplace_back(host); + messageIds.emplace_back(messageId); + appIdxs.emplace_back(appIdx); +} + +SchedulingDecision SchedulingDecision::fromPointToPointMappings( + faabric::PointToPointMappings& mappings) +{ + SchedulingDecision decision(mappings.appid()); + + for (const auto& m : mappings.mappings()) { + decision.addMessage(m.host(), m.messageid(), m.recvidx()); + } + + return decision; +} +} diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index d9f16a00f..48b767b0e 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -1,7 +1,7 @@ -#include "faabric_utils.h" #include #include "DistTestExecutor.h" +#include "faabric_utils.h" #include "init.h" #include @@ -128,7 +128,8 @@ int handleFakeDiffsThreadedFunction( // Dispatch the message, expecting them all to execute on other hosts std::string thisHost = faabric::util::getSystemConfig().endpointHost; faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); - std::vector executedHosts = sch.callFunctions(req); + + std::vector executedHosts = sch.callFunctions(req).hosts; bool rightHosts = true; for (auto& h : executedHosts) { diff --git a/tests/dist/scheduler/test_funcs.cpp b/tests/dist/scheduler/test_funcs.cpp index 3b8ad2a53..f4fe5445e 100644 --- a/tests/dist/scheduler/test_funcs.cpp +++ b/tests/dist/scheduler/test_funcs.cpp @@ -9,6 +9,7 @@ #include #include #include +#include namespace tests { @@ -23,12 +24,26 @@ TEST_CASE_METHOD(DistTestsFixture, res.set_slots(nLocalSlots); sch.setThisHostResources(res); + std::string thisHost = conf.endpointHost; + std::string otherHost = getWorkerIP(); + // Set up the messages std::shared_ptr req = faabric::util::batchExecFactory("funcs", "simple", nFuncs); + // Set up the expectation + faabric::util::SchedulingDecision expectedDecision( + req->messages().at(0).appid()); + expectedDecision.addMessage(thisHost, req->messages().at(0)); + expectedDecision.addMessage(thisHost, req->messages().at(1)); + expectedDecision.addMessage(otherHost, req->messages().at(2)); + expectedDecision.addMessage(otherHost, req->messages().at(3)); + // Call the functions - sch.callFunctions(req); + faabric::util::SchedulingDecision actualDecision = sch.callFunctions(req); + + // Check decision is as expected + checkSchedulingDecisionEquality(actualDecision, expectedDecision); // Check functions executed on this host for (int i = 0; i < nLocalSlots; i++) { diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index a930af7a7..67e141947 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -1,6 +1,6 @@ -#include "faabric_utils.h" #include +#include "faabric_utils.h" #include "fixtures.h" #include "init.h" @@ -14,6 +14,7 @@ #include #include #include +#include #include namespace tests { @@ -53,7 +54,8 @@ TEST_CASE_METHOD(DistTestsFixture, sch.setThisHostResources(res); std::vector expectedHosts = { getWorkerIP() }; - std::vector executedHosts = sch.callFunctions(req); + faabric::util::SchedulingDecision decision = sch.callFunctions(req); + std::vector executedHosts = decision.hosts; REQUIRE(expectedHosts == executedHosts); int actualResult = sch.awaitThreadResult(m.id()); @@ -94,7 +96,8 @@ TEST_CASE_METHOD(DistTestsFixture, sch.setThisHostResources(res); std::vector expectedHosts = { getMasterIP() }; - std::vector executedHosts = sch.callFunctions(req); + faabric::util::SchedulingDecision decision = sch.callFunctions(req); + std::vector executedHosts = decision.hosts; REQUIRE(expectedHosts == executedHosts); faabric::Message actualResult = sch.getFunctionResult(m.id(), 10000); diff --git a/tests/dist/transport/functions.cpp b/tests/dist/transport/functions.cpp index f94a80a9b..82bc540d3 100644 --- a/tests/dist/transport/functions.cpp +++ b/tests/dist/transport/functions.cpp @@ -1,7 +1,7 @@ -#include "faabric_utils.h" #include #include "DistTestExecutor.h" +#include "faabric_utils.h" #include "init.h" #include @@ -25,20 +25,6 @@ int handlePointToPointFunction( faabric::transport::PointToPointBroker& broker = faabric::transport::getPointToPointBroker(); - // Start by receiving a kick-off message from the master (to make sure the - // mappings have been broadcasted) - std::vector kickOffData = - broker.recvMessage(msg.appid(), 0, appIdx); - - // Check data received - std::vector expectedKickOffData = { 0, 1, 2 }; - if (kickOffData != expectedKickOffData) { - SPDLOG_ERROR("Point-to-point kick-off not as expected {} != {}", - formatByteArrayToIntString(kickOffData), - formatByteArrayToIntString(expectedKickOffData)); - return 1; - } - // Send to next index in ring and recv from previous in ring. uint8_t minIdx = 1; uint8_t maxIdx = 3; diff --git a/tests/dist/transport/test_point_to_point.cpp b/tests/dist/transport/test_point_to_point.cpp index 495e78dac..3e2f01832 100644 --- a/tests/dist/transport/test_point_to_point.cpp +++ b/tests/dist/transport/test_point_to_point.cpp @@ -1,6 +1,6 @@ -#include "faabric_utils.h" #include +#include "faabric_utils.h" #include "fixtures.h" #include "init.h" @@ -9,6 +9,7 @@ #include #include #include +#include namespace tests { @@ -46,32 +47,24 @@ TEST_CASE_METHOD(DistTestsFixture, // Set up individual messages // Note that this thread is acting as app index 0 + faabric::util::SchedulingDecision expectedDecision(appId); for (int i = 0; i < nFuncs; i++) { faabric::Message& msg = req->mutable_messages()->at(i); msg.set_appindex(i + 1); - // Register function locations to where we assume they'll be executed - // (we'll confirm this is the case after scheduling) - broker.setHostForReceiver( - msg.appid(), msg.appindex(), expectedHosts.at(i)); + // Add to expected decision + expectedDecision.addMessage(expectedHosts.at(i), req->messages().at(i)); } // Call the functions - std::vector actualHosts = sch.callFunctions(req); - REQUIRE(actualHosts == expectedHosts); - - // Broadcast mappings to other hosts - broker.broadcastMappings(appId); + faabric::util::SchedulingDecision actualDecision = sch.callFunctions(req); + checkSchedulingDecisionEquality(actualDecision, expectedDecision); - // Send kick-off message to all functions - std::vector kickOffData = { 0, 1, 2 }; - for (int i = 0; i < nFuncs; i++) { - broker.sendMessage( - appId, 0, i + 1, kickOffData.data(), kickOffData.size()); - } + // Set up point-to-point mappings + broker.setAndSendMappingsFromSchedulingDecision(actualDecision); - // Check other functions executed successfully + // Check functions executed successfully for (int i = 0; i < nFuncs; i++) { faabric::Message& m = req->mutable_messages()->at(i); diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index c4fe6566b..3925d44ac 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -277,7 +277,7 @@ class TestExecutorFixture conf.overrideCpuCount = 10; conf.boundTimeout = SHORT_TEST_TIMEOUT_MS; - return sch.callFunctions(req, forceLocal); + return sch.callFunctions(req, forceLocal).hosts; } private: diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 62919831d..6f59af05c 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -1,6 +1,7 @@ +#include + #include "DummyExecutorFactory.h" #include "faabric_utils.h" -#include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include #include using namespace faabric::scheduler; @@ -249,13 +251,14 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") faabric::scheduler::queueResourceResponse(otherHost, otherResources); // Set up the messages - std::vector expectedHostsOne; std::shared_ptr reqOne = faabric::util::batchExecFactory("foo", "bar", nCallsOne); reqOne->set_type(execMode); reqOne->set_subtype(expectedSubType); reqOne->set_contextdata(expectedContextData); + faabric::util::SchedulingDecision expectedDecisionOne( + reqOne->messages().at(0).appid()); for (int i = 0; i < nCallsOne; i++) { // Set snapshot key faabric::Message& msg = reqOne->mutable_messages()->at(i); @@ -267,14 +270,16 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") // Expect this host to handle up to its number of cores bool isThisHost = i < thisCores; if (isThisHost) { - expectedHostsOne.push_back(thisHost); + expectedDecisionOne.addMessage(thisHost, msg); } else { - expectedHostsOne.push_back(otherHost); + expectedDecisionOne.addMessage(otherHost, msg); } } // Schedule the functions - std::vector actualHostsOne = sch.callFunctions(reqOne); + faabric::util::SchedulingDecision actualDecisionOne = + sch.callFunctions(reqOne); + checkSchedulingDecisionEquality(actualDecisionOne, expectedDecisionOne); // Check resource requests have been made to other host auto resRequestsOne = faabric::scheduler::getResourceRequests(); @@ -293,9 +298,6 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") REQUIRE(pushedSnapshot.second.data == snapshot.data); } - // Check scheduled on expected hosts - REQUIRE(actualHostsOne == expectedHostsOne); - // Check the executor counts on this host faabric::Message m = reqOne->messages().at(0); faabric::HostResources res = sch.getThisHostResources(); @@ -332,28 +334,30 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") faabric::scheduler::queueResourceResponse(otherHost, otherResources); // Now schedule a second batch and check they're all sent to the other host - std::vector expectedHostsTwo; std::shared_ptr reqTwo = faabric::util::batchExecFactory("foo", "bar", nCallsTwo); + faabric::util::SchedulingDecision expectedDecisionTwo( + reqTwo->messages().at(0).appid()); for (int i = 0; i < nCallsTwo; i++) { faabric::Message& msg = reqTwo->mutable_messages()->at(i); msg.set_snapshotkey(expectedSnapshot); - expectedHostsTwo.push_back(otherHost); + expectedDecisionTwo.addMessage(otherHost, msg); } // Create the batch request reqTwo->set_type(execMode); // Schedule the functions - std::vector actualHostsTwo = sch.callFunctions(reqTwo); + faabric::util::SchedulingDecision actualDecisionTwo = + sch.callFunctions(reqTwo); // Check resource request made again auto resRequestsTwo = faabric::scheduler::getResourceRequests(); REQUIRE(resRequestsTwo.size() == 1); REQUIRE(resRequestsTwo.at(0).first == otherHost); - // Check scheduled on expected hosts - REQUIRE(actualHostsTwo == expectedHostsTwo); + // Check scheduling decision + checkSchedulingDecisionEquality(actualDecisionTwo, expectedDecisionTwo); // Check no other functions have been scheduled on this host REQUIRE(sch.getRecordedMessagesLocal().size() == thisCores); @@ -429,21 +433,23 @@ TEST_CASE_METHOD(SlowExecutorFixture, std::shared_ptr req = faabric::util::batchExecFactory("foo", "bar", nCalls); req->set_type(execMode); + + faabric::util::SchedulingDecision expectedDecision( + req->messages().at(0).appid()); for (int i = 0; i < nCalls; i++) { faabric::Message& msg = req->mutable_messages()->at(i); msg.set_snapshotkey(expectedSnapshot); + + if (i == 1 || i == 2) { + expectedDecision.addMessage(otherHost, msg); + } else { + expectedDecision.addMessage(thisHost, msg); + } } // Submit the request - std::vector executedHosts = sch.callFunctions(req); - - // Check list of executed hosts - std::vector expectedHosts = - std::vector(nCalls, thisHost); - expectedHosts.at(1) = otherHost; - expectedHosts.at(2) = otherHost; - - REQUIRE(executedHosts == expectedHosts); + faabric::util::SchedulingDecision decision = sch.callFunctions(req); + checkSchedulingDecisionEquality(decision, expectedDecision); // Check status of local queueing int expectedLocalCalls = nCalls - 2; @@ -704,9 +710,9 @@ TEST_CASE_METHOD(SlowExecutorFixture, faabric::util::batchExecFactory("blah", "foo", 1); req->mutable_messages()->at(0).set_masterhost(otherHost); - std::vector expectedHosts = { "" }; - std::vector executedHosts = sch.callFunctions(req); - REQUIRE(executedHosts == expectedHosts); + faabric::util::SchedulingDecision decision = sch.callFunctions(req); + REQUIRE(decision.hosts.empty()); + REQUIRE(decision.returnHost == otherHost); // Check forwarded to master auto actualReqs = faabric::scheduler::getBatchRequests(); diff --git a/tests/test/transport/test_point_to_point.cpp b/tests/test/transport/test_point_to_point.cpp index a4ce27d3b..0e02b131c 100644 --- a/tests/test/transport/test_point_to_point.cpp +++ b/tests/test/transport/test_point_to_point.cpp @@ -8,7 +8,9 @@ #include #include #include +#include #include +#include using namespace faabric::transport; using namespace faabric::util; @@ -33,145 +35,6 @@ class PointToPointClientServerFixture faabric::transport::PointToPointServer server; }; -TEST_CASE_METHOD(PointToPointClientServerFixture, - "Test set and get point-to-point hosts", - "[transport][ptp]") -{ - // Note - deliberately overlap app indexes to make sure app id counts - int appIdA = 123; - int appIdB = 345; - int idxA1 = 0; - int idxB1 = 2; - int idxA2 = 10; - int idxB2 = 10; - - std::string hostA = "host-a"; - std::string hostB = "host-b"; - std::string hostC = "host-c"; - - REQUIRE_THROWS(broker.getHostForReceiver(appIdA, idxA1)); - REQUIRE_THROWS(broker.getHostForReceiver(appIdA, idxA2)); - REQUIRE_THROWS(broker.getHostForReceiver(appIdB, idxB1)); - REQUIRE_THROWS(broker.getHostForReceiver(appIdB, idxB2)); - - broker.setHostForReceiver(appIdA, idxA1, hostA); - broker.setHostForReceiver(appIdB, idxB1, hostB); - - std::set expectedA = { idxA1 }; - std::set expectedB = { idxB1 }; - REQUIRE(broker.getIdxsRegisteredForApp(appIdA) == expectedA); - REQUIRE(broker.getIdxsRegisteredForApp(appIdB) == expectedB); - - REQUIRE(broker.getHostForReceiver(appIdA, idxA1) == hostA); - REQUIRE_THROWS(broker.getHostForReceiver(appIdA, idxA2)); - REQUIRE(broker.getHostForReceiver(appIdB, idxB1) == hostB); - REQUIRE_THROWS(broker.getHostForReceiver(appIdB, idxB2)); - - broker.setHostForReceiver(appIdA, idxA2, hostB); - broker.setHostForReceiver(appIdB, idxB2, hostC); - - expectedA = { idxA1, idxA2 }; - expectedB = { idxB1, idxB2 }; - - REQUIRE(broker.getIdxsRegisteredForApp(appIdA) == expectedA); - REQUIRE(broker.getIdxsRegisteredForApp(appIdB) == expectedB); - - REQUIRE(broker.getHostForReceiver(appIdA, idxA1) == hostA); - REQUIRE(broker.getHostForReceiver(appIdA, idxA2) == hostB); - REQUIRE(broker.getHostForReceiver(appIdB, idxB1) == hostB); - REQUIRE(broker.getHostForReceiver(appIdB, idxB2) == hostC); -} - -TEST_CASE_METHOD(PointToPointClientServerFixture, - "Test sending point-to-point mappings via broker", - "[transport][ptp]") -{ - faabric::util::setMockMode(true); - - int appIdA = 123; - int appIdB = 345; - - int idxA1 = 1; - int idxA2 = 2; - int idxB1 = 1; - - std::string hostA = "host-a"; - std::string hostB = "host-b"; - std::string hostC = "host-c"; - - faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); - sch.reset(); - - sch.addHostToGlobalSet(hostA); - sch.addHostToGlobalSet(hostB); - sch.addHostToGlobalSet(hostC); - - // Includes this host - REQUIRE(sch.getAvailableHosts().size() == 4); - - broker.setHostForReceiver(appIdA, idxA1, hostA); - broker.setHostForReceiver(appIdA, idxA2, hostB); - broker.setHostForReceiver(appIdB, idxB1, hostB); - - std::vector expectedHosts; - SECTION("Send single host") - { - broker.sendMappings(appIdA, hostC); - expectedHosts = { hostC }; - } - - SECTION("Broadcast all hosts") - { - broker.broadcastMappings(appIdA); - - // Don't expect to be broadcast to this host - expectedHosts = { hostA, hostB, hostC }; - } - - auto actualSent = getSentMappings(); - REQUIRE(actualSent.size() == expectedHosts.size()); - - // Sort the sent mappings based on host - std::sort(actualSent.begin(), - actualSent.end(), - [](const std::pair& a, - const std::pair& b) - -> bool { return a.first < b.first; }); - - // Check each of the sent mappings is as we would expect - for (int i = 0; i < expectedHosts.size(); i++) { - REQUIRE(actualSent.at(i).first == expectedHosts.at(i)); - - faabric::PointToPointMappings actualMappings = actualSent.at(i).second; - REQUIRE(actualMappings.mappings().size() == 2); - - faabric::PointToPointMappings::PointToPointMapping mappingA = - actualMappings.mappings().at(0); - faabric::PointToPointMappings::PointToPointMapping mappingB = - actualMappings.mappings().at(1); - - REQUIRE(mappingA.appid() == appIdA); - REQUIRE(mappingB.appid() == appIdA); - - // Note - we don't know the order of the mappings and can't easily sort - // the data in the protobuf object, so it's easiest just to check both - // possible orderings. - if (mappingA.recvidx() == idxA1) { - REQUIRE(mappingA.host() == hostA); - - REQUIRE(mappingB.recvidx() == idxA2); - REQUIRE(mappingB.host() == hostB); - } else if (mappingA.recvidx() == idxA2) { - REQUIRE(mappingA.host() == hostB); - - REQUIRE(mappingB.recvidx() == idxA1); - REQUIRE(mappingB.host() == hostA); - } else { - FAIL(); - } - } -} - TEST_CASE_METHOD(PointToPointClientServerFixture, "Test sending point-to-point mappings from client", "[transport][ptp]") @@ -189,24 +52,26 @@ TEST_CASE_METHOD(PointToPointClientServerFixture, REQUIRE(broker.getIdxsRegisteredForApp(appIdA).empty()); REQUIRE(broker.getIdxsRegisteredForApp(appIdB).empty()); - faabric::PointToPointMappings mappings; + faabric::PointToPointMappings mappingsA; + mappingsA.set_appid(appIdA); - auto* mappingA1 = mappings.add_mappings(); - mappingA1->set_appid(appIdA); + faabric::PointToPointMappings mappingsB; + mappingsB.set_appid(appIdB); + + auto* mappingA1 = mappingsA.add_mappings(); mappingA1->set_recvidx(idxA1); mappingA1->set_host(hostA); - auto* mappingA2 = mappings.add_mappings(); - mappingA2->set_appid(appIdA); + auto* mappingA2 = mappingsA.add_mappings(); mappingA2->set_recvidx(idxA2); mappingA2->set_host(hostB); - auto* mappingB1 = mappings.add_mappings(); - mappingB1->set_appid(appIdB); + auto* mappingB1 = mappingsB.add_mappings(); mappingB1->set_recvidx(idxB1); mappingB1->set_host(hostA); - cli.sendMappings(mappings); + cli.sendMappings(mappingsA); + cli.sendMappings(mappingsB); REQUIRE(broker.getIdxsRegisteredForApp(appIdA).size() == 2); REQUIRE(broker.getIdxsRegisteredForApp(appIdB).size() == 1); @@ -229,8 +94,20 @@ TEST_CASE_METHOD(PointToPointClientServerFixture, conf.endpointHost = LOCALHOST; // Register both indexes on this host - broker.setHostForReceiver(appId, idxA, LOCALHOST); - broker.setHostForReceiver(appId, idxB, LOCALHOST); + faabric::util::SchedulingDecision decision(appId); + + faabric::Message msgA = faabric::util::messageFactory("foo", "bar"); + msgA.set_appid(appId); + msgA.set_appindex(idxA); + + faabric::Message msgB = faabric::util::messageFactory("foo", "bar"); + msgB.set_appid(appId); + msgB.set_appindex(idxB); + + decision.addMessage(LOCALHOST, msgA); + decision.addMessage(LOCALHOST, msgB); + + broker.setAndSendMappingsFromSchedulingDecision(decision); std::vector sentDataA = { 0, 1, 2, 3 }; std::vector receivedDataA; @@ -241,15 +118,13 @@ TEST_CASE_METHOD(PointToPointClientServerFixture, // async handling broker.sendMessage(appId, idxA, idxB, sentDataA.data(), sentDataA.size()); - SLEEP_MS(1000); - std::thread t([appId, idxA, idxB, &receivedDataA, &sentDataB] { PointToPointBroker& broker = getPointToPointBroker(); // Receive the first message receivedDataA = broker.recvMessage(appId, idxA, idxB); - // Send a message back (note reversing the indexes) + // Send a message back broker.sendMessage( appId, idxB, idxA, sentDataB.data(), sentDataB.size()); @@ -268,4 +143,126 @@ TEST_CASE_METHOD(PointToPointClientServerFixture, conf.reset(); } + +TEST_CASE_METHOD( + PointToPointClientServerFixture, + "Test setting up point-to-point mappings with scheduling decision", + "[transport][ptp]") +{ + faabric::util::setMockMode(true); + + std::string hostA = "hostA"; + std::string hostB = "hostB"; + std::string hostC = "hostC"; + + int nMessages = 6; + auto req = batchExecFactory("foo", "bar", nMessages); + for (int i = 0; i < nMessages; i++) { + req->mutable_messages()->at(i).set_appindex(i); + } + + faabric::Message& msgA = req->mutable_messages()->at(0); + faabric::Message& msgB = req->mutable_messages()->at(1); + faabric::Message& msgC = req->mutable_messages()->at(2); + faabric::Message& msgD = req->mutable_messages()->at(3); + faabric::Message& msgE = req->mutable_messages()->at(4); + faabric::Message& msgF = req->mutable_messages()->at(5); + + int appId = msgA.appid(); + SchedulingDecision decision(appId); + decision.addMessage(hostB, msgA); + decision.addMessage(hostA, msgB); + decision.addMessage(hostC, msgC); + decision.addMessage(hostB, msgD); + decision.addMessage(hostB, msgE); + decision.addMessage(hostC, msgF); + + // Set up and send the mappings + broker.setAndSendMappingsFromSchedulingDecision(decision); + + // Check locally + REQUIRE(broker.getHostForReceiver(appId, msgA.appindex()) == hostB); + REQUIRE(broker.getHostForReceiver(appId, msgB.appindex()) == hostA); + REQUIRE(broker.getHostForReceiver(appId, msgC.appindex()) == hostC); + REQUIRE(broker.getHostForReceiver(appId, msgD.appindex()) == hostB); + REQUIRE(broker.getHostForReceiver(appId, msgE.appindex()) == hostB); + REQUIRE(broker.getHostForReceiver(appId, msgF.appindex()) == hostC); + + // Check the mappings have been sent out to the relevant hosts + auto actualSent = getSentMappings(); + REQUIRE(actualSent.size() == 3); + + // Sort the sent mappings based on host + std::sort(actualSent.begin(), + actualSent.end(), + [](const std::pair& a, + const std::pair& b) + -> bool { return a.first < b.first; }); + + std::vector expectedHosts = { hostA, hostB, hostC }; + std::set expectedHostAIdxs = { msgB.appindex() }; + std::set expectedHostBIdxs = { msgA.appindex(), + msgD.appindex(), + msgE.appindex() }; + std::set expectedHostCIdxs = { msgC.appindex(), msgF.appindex() }; + + // Check all mappings are the same + for (int i = 0; i < 3; i++) { + REQUIRE(actualSent.at(i).first == expectedHosts.at(i)); + faabric::PointToPointMappings actual = actualSent.at(i).second; + + std::set hostAIdxs; + std::set hostBIdxs; + std::set hostCIdxs; + + for (const auto& m : actual.mappings()) { + if (m.host() == hostA) { + hostAIdxs.insert(m.recvidx()); + } else if (m.host() == hostB) { + hostBIdxs.insert(m.recvidx()); + } else if (m.host() == hostC) { + hostCIdxs.insert(m.recvidx()); + } else { + FAIL(); + } + } + + REQUIRE(hostAIdxs == expectedHostAIdxs); + REQUIRE(hostBIdxs == expectedHostBIdxs); + REQUIRE(hostCIdxs == expectedHostCIdxs); + } +} + +TEST_CASE_METHOD(PointToPointClientServerFixture, + "Test waiting for point-to-point messaging to be enabled", + "[transport][ptp]") +{ + int appId = 123; + std::atomic sharedInt = 5; + + faabric::util::SchedulingDecision decision(appId); + faabric::Message msg = faabric::util::messageFactory("foo", "bar"); + decision.addMessage(faabric::util::getSystemConfig().endpointHost, msg); + + // Background thread that will eventually enable the app and change the + // shared integer + std::thread t([this, &decision, &sharedInt] { + SLEEP_MS(1000); + broker.setUpLocalMappingsFromSchedulingDecision(decision); + + sharedInt.fetch_add(100); + }); + + broker.waitForMappingsOnThisHost(appId); + + // The sum won't have happened yet if this thread hasn't been forced to wait + REQUIRE(sharedInt == 105); + + // Call again and check it doesn't block + broker.waitForMappingsOnThisHost(appId); + + if (t.joinable()) { + t.join(); + } +} } diff --git a/tests/test/util/test_scheduling.cpp b/tests/test/util/test_scheduling.cpp new file mode 100644 index 000000000..7e5ee73fe --- /dev/null +++ b/tests/test/util/test_scheduling.cpp @@ -0,0 +1,86 @@ +#include + +#include "faabric_utils.h" +#include "fixtures.h" + +#include +#include + +using namespace faabric::util; + +namespace tests { + +TEST_CASE("Test building scheduling decisions", "[util]") +{ + int appId = 123; + + std::string hostA = "hostA"; + std::string hostB = "hostB"; + std::string hostC = "hostC"; + + auto req = batchExecFactory("foo", "bar", 3); + + SchedulingDecision decision(appId); + + faabric::Message msgA = req->mutable_messages()->at(0); + faabric::Message msgB = req->mutable_messages()->at(1); + faabric::Message msgC = req->mutable_messages()->at(2); + + decision.addMessage(hostB, msgA); + decision.addMessage(hostA, msgB); + decision.addMessage(hostC, msgC); + + std::vector expectedMsgIds = { msgA.id(), msgB.id(), msgC.id() }; + std::vector expectedHosts = { hostB, hostA, hostC }; + std::vector expectedAppIdxs = { msgA.appindex(), + msgB.appindex(), + msgC.appindex() }; + + REQUIRE(decision.appId == appId); + REQUIRE(decision.nFunctions == 3); + REQUIRE(decision.messageIds == expectedMsgIds); + REQUIRE(decision.hosts == expectedHosts); + REQUIRE(decision.appIdxs == expectedAppIdxs); +} + +TEST_CASE("Test converting point-to-point mappings to scheduling decisions", + "[util]") +{ + int appId = 123; + + int idxA = 22; + int msgIdA = 222; + std::string hostA = "foobar"; + + int idxB = 33; + int msgIdB = 333; + std::string hostB = "bazbaz"; + + std::vector expectedIdxs = { idxA, idxB }; + std::vector expectedMessageIds = { msgIdA, msgIdB }; + std::vector expectedHosts = { hostA, hostB }; + + faabric::PointToPointMappings mappings; + mappings.set_appid(appId); + + auto* mappingA = mappings.add_mappings(); + mappingA->set_host(hostA); + mappingA->set_messageid(msgIdA); + mappingA->set_recvidx(idxA); + + auto* mappingB = mappings.add_mappings(); + mappingB->set_host(hostB); + mappingB->set_messageid(msgIdB); + mappingB->set_recvidx(idxB); + + auto actual = + faabric::util::SchedulingDecision::fromPointToPointMappings(mappings); + + REQUIRE(actual.appId == appId); + REQUIRE(actual.nFunctions == 2); + + REQUIRE(actual.appIdxs == expectedIdxs); + REQUIRE(actual.messageIds == expectedMessageIds); + REQUIRE(actual.hosts == expectedHosts); +} +} diff --git a/tests/utils/CMakeLists.txt b/tests/utils/CMakeLists.txt index d1de51f19..294165f87 100644 --- a/tests/utils/CMakeLists.txt +++ b/tests/utils/CMakeLists.txt @@ -4,11 +4,12 @@ set(LIB_FILES DummyExecutorFactory.cpp DummyExecutorFactory.h exec_graph_utils.cpp + faabric_utils.h fixtures.h - message_utils.cpp http_utils.cpp + message_utils.cpp + scheduling_utils.cpp system_utils.cpp - faabric_utils.h ) add_library(faabric_test_utils "${LIB_FILES}") diff --git a/tests/utils/faabric_utils.h b/tests/utils/faabric_utils.h index b2a3f042d..3a9ea939c 100644 --- a/tests/utils/faabric_utils.h +++ b/tests/utils/faabric_utils.h @@ -6,6 +6,7 @@ #include #include #include +#include #include using namespace faabric; @@ -65,6 +66,10 @@ void cleanFaabric(); void checkMessageEquality(const faabric::Message& msgA, const faabric::Message& msgB); +void checkSchedulingDecisionEquality( + const faabric::util::SchedulingDecision& decisionA, + const faabric::util::SchedulingDecision& decisionB); + void checkExecGraphNodeEquality(const scheduler::ExecGraphNode& nodeA, const scheduler::ExecGraphNode& nodeB); diff --git a/tests/utils/scheduling_utils.cpp b/tests/utils/scheduling_utils.cpp new file mode 100644 index 000000000..fb123bc80 --- /dev/null +++ b/tests/utils/scheduling_utils.cpp @@ -0,0 +1,18 @@ +#include + +#include "faabric_utils.h" + +namespace tests { + +void checkSchedulingDecisionEquality( + const faabric::util::SchedulingDecision& decisionA, + const faabric::util::SchedulingDecision& decisionB) +{ + REQUIRE(decisionA.appId == decisionB.appId); + REQUIRE(decisionA.nFunctions == decisionB.nFunctions); + REQUIRE(decisionA.messageIds == decisionB.messageIds); + REQUIRE(decisionA.hosts == decisionB.hosts); + REQUIRE(decisionA.appIdxs == decisionB.appIdxs); + REQUIRE(decisionA.returnHost == decisionB.returnHost); +} +}