Skip to content

Commit

Permalink
Retry GET from Redis for messages in the Execution Graph (#181)
Browse files Browse the repository at this point in the history
* add retry mechanism for redis get

* add regression test

* using defined constants, incrementing variable in loop, and checking for it in the tests
  • Loading branch information
csegarragonz committed Nov 30, 2021
1 parent 657a506 commit dba98f1
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 0 deletions.
24 changes: 24 additions & 0 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <unordered_set>

#define FLUSH_TIMEOUT_MS 10000
#define GET_EXEC_GRAPH_SLEEP_MS 500
#define MAX_GET_EXEC_GRAPH_RETRIES 3

using namespace faabric::util;
using namespace faabric::snapshot;
Expand Down Expand Up @@ -1012,7 +1014,29 @@ ExecGraphNode Scheduler::getFunctionExecGraphNode(unsigned int messageId)

// Get the result for this message
std::string statusKey = faabric::util::statusKeyFromMessageId(messageId);

// We want to make sure the message bytes have been populated by the time
// we get them from Redis. For the time being, we retry a number of times
// and fail if we don't succeed.
std::vector<uint8_t> messageBytes = redis.get(statusKey);
int numRetries = 0;
while (messageBytes.empty() && numRetries < MAX_GET_EXEC_GRAPH_RETRIES) {
SPDLOG_WARN(
"Retry GET message for ExecGraph node with id {} (Retry {}/{})",
messageId,
numRetries + 1,
MAX_GET_EXEC_GRAPH_RETRIES);
SLEEP_MS(GET_EXEC_GRAPH_SLEEP_MS);
messageBytes = redis.get(statusKey);
++numRetries;
}
if (messageBytes.empty()) {
SPDLOG_ERROR("Can't GET message from redis (id: {}, key: {})",
messageId,
statusKey);
throw std::runtime_error("Message for exec graph not in Redis");
}

faabric::Message result;
result.ParseFromArray(messageBytes.data(), (int)messageBytes.size());

Expand Down
52 changes: 52 additions & 0 deletions tests/dist/scheduler/test_exec_graph.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include <catch2/catch.hpp>

#include "fixtures.h"

#include <faabric/scheduler/Scheduler.h>

namespace tests {

TEST_CASE_METHOD(DistTestsFixture,
"Test generating the execution graph",
"[funcs]")
{
// Set up this host's resources
int nLocalSlots = 2;
int nFuncs = 4;
faabric::HostResources res;
res.set_slots(nLocalSlots);
sch.setThisHostResources(res);

// Retry the test a number of times to catch the race-condition where
// we get the execution graph before all results have been published
int numRetries = 10;
for (int r = 0; r < numRetries; r++) {
// Set up the messages
std::shared_ptr<faabric::BatchExecuteRequest> req =
faabric::util::batchExecFactory("funcs", "simple", nFuncs);

// Add a fictional chaining dependency between functions
for (int i = 1; i < nFuncs; i++) {
sch.logChainedFunction(req->mutable_messages()->at(0).id(),
req->mutable_messages()->at(i).id());
}

// Call the functions
sch.callFunctions(req);

faabric::Message& m = req->mutable_messages()->at(0);

// Wait for the result, and immediately after query for the execution
// graph
faabric::Message result = sch.getFunctionResult(m.id(), 1000);
auto execGraph = sch.getFunctionExecGraph(m.id());
REQUIRE(countExecGraphNodes(execGraph) == nFuncs);

REQUIRE(execGraph.rootNode.msg.id() == m.id());
for (int i = 1; i < nFuncs; i++) {
auto node = execGraph.rootNode.children.at(i - 1);
REQUIRE(node.msg.id() == req->mutable_messages()->at(i).id());
}
}
}
}
9 changes: 9 additions & 0 deletions tests/test/scheduler/test_exec_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ TEST_CASE("Test execution graph", "[scheduler][exec-graph]")
checkExecGraphEquality(expected, actual);
}

TEST_CASE("Test can't get exec graph if results are not published",
"[scheduler][exec-graph]")
{
faabric::Message msg = faabric::util::messageFactory("demo", "echo");

REQUIRE_THROWS(
faabric::scheduler::getScheduler().getFunctionExecGraph(msg.id()));
}

TEST_CASE("Test get unique hosts from exec graph", "[scheduler][exec-graph]")
{
faabric::Message msgA = faabric::util::messageFactory("demo", "echo");
Expand Down

0 comments on commit dba98f1

Please sign in to comment.