diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 97f50d556..e271b9b3d 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -21,6 +21,8 @@ #include #define FLUSH_TIMEOUT_MS 10000 +#define GET_EXEC_GRAPH_SLEEP_MS 500 +#define MAX_GET_EXEC_GRAPH_RETRIES 3 using namespace faabric::util; using namespace faabric::snapshot; @@ -1012,7 +1014,29 @@ ExecGraphNode Scheduler::getFunctionExecGraphNode(unsigned int messageId) // Get the result for this message std::string statusKey = faabric::util::statusKeyFromMessageId(messageId); + + // We want to make sure the message bytes have been populated by the time + // we get them from Redis. For the time being, we retry a number of times + // and fail if we don't succeed. std::vector messageBytes = redis.get(statusKey); + int numRetries = 0; + while (messageBytes.empty() && numRetries < MAX_GET_EXEC_GRAPH_RETRIES) { + SPDLOG_WARN( + "Retry GET message for ExecGraph node with id {} (Retry {}/{})", + messageId, + numRetries + 1, + MAX_GET_EXEC_GRAPH_RETRIES); + SLEEP_MS(GET_EXEC_GRAPH_SLEEP_MS); + messageBytes = redis.get(statusKey); + ++numRetries; + } + if (messageBytes.empty()) { + SPDLOG_ERROR("Can't GET message from redis (id: {}, key: {})", + messageId, + statusKey); + throw std::runtime_error("Message for exec graph not in Redis"); + } + faabric::Message result; result.ParseFromArray(messageBytes.data(), (int)messageBytes.size()); diff --git a/tests/dist/scheduler/test_exec_graph.cpp b/tests/dist/scheduler/test_exec_graph.cpp new file mode 100644 index 000000000..26e756630 --- /dev/null +++ b/tests/dist/scheduler/test_exec_graph.cpp @@ -0,0 +1,52 @@ +#include + +#include "fixtures.h" + +#include + +namespace tests { + +TEST_CASE_METHOD(DistTestsFixture, + "Test generating the execution graph", + "[funcs]") +{ + // Set up this host's resources + int nLocalSlots = 2; + int nFuncs = 4; + faabric::HostResources res; + res.set_slots(nLocalSlots); + sch.setThisHostResources(res); + + // Retry the test a number of times to catch the race-condition where + // we get the execution graph before all results have been published + int numRetries = 10; + for (int r = 0; r < numRetries; r++) { + // Set up the messages + std::shared_ptr req = + faabric::util::batchExecFactory("funcs", "simple", nFuncs); + + // Add a fictional chaining dependency between functions + for (int i = 1; i < nFuncs; i++) { + sch.logChainedFunction(req->mutable_messages()->at(0).id(), + req->mutable_messages()->at(i).id()); + } + + // Call the functions + sch.callFunctions(req); + + faabric::Message& m = req->mutable_messages()->at(0); + + // Wait for the result, and immediately after query for the execution + // graph + faabric::Message result = sch.getFunctionResult(m.id(), 1000); + auto execGraph = sch.getFunctionExecGraph(m.id()); + REQUIRE(countExecGraphNodes(execGraph) == nFuncs); + + REQUIRE(execGraph.rootNode.msg.id() == m.id()); + for (int i = 1; i < nFuncs; i++) { + auto node = execGraph.rootNode.children.at(i - 1); + REQUIRE(node.msg.id() == req->mutable_messages()->at(i).id()); + } + } +} +} diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index 44b17ae10..1975240b6 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -72,6 +72,15 @@ TEST_CASE("Test execution graph", "[scheduler][exec-graph]") checkExecGraphEquality(expected, actual); } +TEST_CASE("Test can't get exec graph if results are not published", + "[scheduler][exec-graph]") +{ + faabric::Message msg = faabric::util::messageFactory("demo", "echo"); + + REQUIRE_THROWS( + faabric::scheduler::getScheduler().getFunctionExecGraph(msg.id())); +} + TEST_CASE("Test get unique hosts from exec graph", "[scheduler][exec-graph]") { faabric::Message msgA = faabric::util::messageFactory("demo", "echo");