From fa4685677c9f5eab3e34c5bafb02aeee25801dbe Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 25 Nov 2021 09:43:32 +0000 Subject: [PATCH 1/3] add retry mechanism for redis get --- src/scheduler/Scheduler.cpp | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 97f50d556..03ae6fb91 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -1012,7 +1012,29 @@ ExecGraphNode Scheduler::getFunctionExecGraphNode(unsigned int messageId) // Get the result for this message std::string statusKey = faabric::util::statusKeyFromMessageId(messageId); + + // We want to make sure the message bytes have been populated by the time + // we get them from Redis. For the time being, we retry a number of times + // and fail if we don't succeed. std::vector messageBytes = redis.get(statusKey); + int maxNumRetries = 3; + int numRetries = 0; + while (messageBytes.empty() && numRetries < maxNumRetries) { + SPDLOG_WARN( + "Retry GET message for ExecGraph node with id {} (Retry {}/{})", + messageId, + numRetries + 1, + maxNumRetries); + SLEEP_MS(500); + messageBytes = redis.get(statusKey); + } + if (messageBytes.empty()) { + SPDLOG_ERROR("Can't GET message from redis (id: {}, key: {})", + messageId, + statusKey); + throw std::runtime_error("Message for exec graph not in Redis"); + } + faabric::Message result; result.ParseFromArray(messageBytes.data(), (int)messageBytes.size()); From 6a0754a9f03c5853d952e96eacc16534f3b1b7d7 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 25 Nov 2021 17:41:51 +0000 Subject: [PATCH 2/3] add regression test --- tests/dist/scheduler/test_exec_graph.cpp | 52 ++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 tests/dist/scheduler/test_exec_graph.cpp diff --git a/tests/dist/scheduler/test_exec_graph.cpp b/tests/dist/scheduler/test_exec_graph.cpp new file mode 100644 index 000000000..26e756630 --- /dev/null +++ b/tests/dist/scheduler/test_exec_graph.cpp @@ -0,0 +1,52 @@ +#include + +#include "fixtures.h" + +#include + +namespace tests { + +TEST_CASE_METHOD(DistTestsFixture, + "Test generating the execution graph", + "[funcs]") +{ + // Set up this host's resources + int nLocalSlots = 2; + int nFuncs = 4; + faabric::HostResources res; + res.set_slots(nLocalSlots); + sch.setThisHostResources(res); + + // Retry the test a number of times to catch the race-condition where + // we get the execution graph before all results have been published + int numRetries = 10; + for (int r = 0; r < numRetries; r++) { + // Set up the messages + std::shared_ptr req = + faabric::util::batchExecFactory("funcs", "simple", nFuncs); + + // Add a fictional chaining dependency between functions + for (int i = 1; i < nFuncs; i++) { + sch.logChainedFunction(req->mutable_messages()->at(0).id(), + req->mutable_messages()->at(i).id()); + } + + // Call the functions + sch.callFunctions(req); + + faabric::Message& m = req->mutable_messages()->at(0); + + // Wait for the result, and immediately after query for the execution + // graph + faabric::Message result = sch.getFunctionResult(m.id(), 1000); + auto execGraph = sch.getFunctionExecGraph(m.id()); + REQUIRE(countExecGraphNodes(execGraph) == nFuncs); + + REQUIRE(execGraph.rootNode.msg.id() == m.id()); + for (int i = 1; i < nFuncs; i++) { + auto node = execGraph.rootNode.children.at(i - 1); + REQUIRE(node.msg.id() == req->mutable_messages()->at(i).id()); + } + } +} +} From 11194b273fbf29cd4801746ece0fd04ed4c56aa8 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 26 Nov 2021 15:09:38 +0000 Subject: [PATCH 3/3] using defined constants, incrementing variable in loop, and checking for it in the tests --- src/scheduler/Scheduler.cpp | 10 ++++++---- tests/test/scheduler/test_exec_graph.cpp | 9 +++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 03ae6fb91..e271b9b3d 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -21,6 +21,8 @@ #include #define FLUSH_TIMEOUT_MS 10000 +#define GET_EXEC_GRAPH_SLEEP_MS 500 +#define MAX_GET_EXEC_GRAPH_RETRIES 3 using namespace faabric::util; using namespace faabric::snapshot; @@ -1017,16 +1019,16 @@ ExecGraphNode Scheduler::getFunctionExecGraphNode(unsigned int messageId) // we get them from Redis. For the time being, we retry a number of times // and fail if we don't succeed. std::vector messageBytes = redis.get(statusKey); - int maxNumRetries = 3; int numRetries = 0; - while (messageBytes.empty() && numRetries < maxNumRetries) { + while (messageBytes.empty() && numRetries < MAX_GET_EXEC_GRAPH_RETRIES) { SPDLOG_WARN( "Retry GET message for ExecGraph node with id {} (Retry {}/{})", messageId, numRetries + 1, - maxNumRetries); - SLEEP_MS(500); + MAX_GET_EXEC_GRAPH_RETRIES); + SLEEP_MS(GET_EXEC_GRAPH_SLEEP_MS); messageBytes = redis.get(statusKey); + ++numRetries; } if (messageBytes.empty()) { SPDLOG_ERROR("Can't GET message from redis (id: {}, key: {})", diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index 44b17ae10..1975240b6 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -72,6 +72,15 @@ TEST_CASE("Test execution graph", "[scheduler][exec-graph]") checkExecGraphEquality(expected, actual); } +TEST_CASE("Test can't get exec graph if results are not published", + "[scheduler][exec-graph]") +{ + faabric::Message msg = faabric::util::messageFactory("demo", "echo"); + + REQUIRE_THROWS( + faabric::scheduler::getScheduler().getFunctionExecGraph(msg.id())); +} + TEST_CASE("Test get unique hosts from exec graph", "[scheduler][exec-graph]") { faabric::Message msgA = faabric::util::messageFactory("demo", "echo");