Skip to content

Commit

Permalink
Latency fixes (#135)
Browse files Browse the repository at this point in the history
* Endpoint constructor fix

* Redis script for setFunctionResult

* Avoid lock on Scheduler::vacateSlot

* Unlock scheduler while spinning up new executors

* Bypass redis in local, sync function calls

* Significantly faster (10x) file to bytes reading

* Replace timeout exceptions with std::optional

I found the exceptions to make debugging a bit harder when the endpoint failed, this also simplifies the main loop of the zmq server
  • Loading branch information
eigenraven committed Aug 27, 2021
1 parent 8936c63 commit 94aae8c
Show file tree
Hide file tree
Showing 15 changed files with 269 additions and 146 deletions.
2 changes: 1 addition & 1 deletion include/faabric/endpoint/Endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace faabric::endpoint {
class Endpoint
{
public:
Endpoint() = default;
Endpoint();

Endpoint(int port, int threadCount);

Expand Down
37 changes: 30 additions & 7 deletions include/faabric/redis/Redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <mutex>
#include <set>
#include <string>
#include <string_view>
#include <thread>
#include <unordered_set>
#include <vector>
Expand All @@ -24,6 +25,7 @@ class RedisInstance
explicit RedisInstance(RedisRole role);

std::string delifeqSha;
std::string schedPublishSha;

std::string ip;
std::string hostname;
Expand All @@ -35,15 +37,31 @@ class RedisInstance
std::mutex scriptsLock;

std::string loadScript(redisContext* context,
const std::string& scriptBody);
const std::string_view scriptBody);

// Script to delete a key if it equals a given value
const std::string delifeqCmd =
"if redis.call(\"GET\", KEYS[1]) == ARGV[1] then \n"
" return redis.call(\"DEL\", KEYS[1]) \n"
"else \n"
" return 0 \n"
"end";
const std::string_view delifeqCmd = R"---(
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
)---";

// Script to push and expire function execution results avoiding extra
// copies and round-trips
const std::string_view schedPublishCmd = R"---(
local key = KEYS[1]
local status_key = KEYS[2]
local result = ARGV[1]
local result_expiry = tonumber(ARGV[2])
local status_expiry = tonumber(ARGV[3])
redis.call('RPUSH', key, result)
redis.call('EXPIRE', key, result_expiry)
redis.call('SET', status_key, result)
redis.call('EXPIRE', status_key, status_expiry)
return 0
)---";
};

class Redis
Expand Down Expand Up @@ -181,6 +199,11 @@ class Redis
long buffLen,
long nElems);

// Scheduler result publish
void publishSchedulerResult(const std::string& key,
const std::string& status_key,
const std::vector<uint8_t>& result);

private:
explicit Redis(const RedisInstance& instance);

Expand Down
10 changes: 9 additions & 1 deletion include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,15 @@ class Scheduler
const std::string& otherHost);

faabric::HostResources thisHostResources;
std::atomic<int32_t> thisHostUsedSlots;
std::set<std::string> availableHostsCache;
std::unordered_map<std::string, std::set<std::string>> registeredHosts;

std::unordered_map<uint32_t,
std::promise<std::unique_ptr<faabric::Message>>>
localResults;
std::mutex localResultsMutex;

std::vector<faabric::Message> recordedMessagesAll;
std::vector<faabric::Message> recordedMessagesLocal;
std::vector<std::pair<std::string, faabric::Message>>
Expand All @@ -214,7 +220,9 @@ class Scheduler
std::vector<std::string> getUnregisteredHosts(const std::string& funcStr,
bool noCache = false);

std::shared_ptr<Executor> claimExecutor(faabric::Message& msg);
std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
faabric::util::FullLock& schedulerLock);

faabric::HostResources getHostResources(const std::string& host);

Expand Down
13 changes: 7 additions & 6 deletions include/faabric/transport/MessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <faabric/transport/Message.h>
#include <faabric/util/exception.h>

#include <optional>
#include <thread>
#include <zmq.hpp>

Expand Down Expand Up @@ -56,11 +57,11 @@ class MessageEndpoint
size_t dataSize,
bool more);

Message doRecv(zmq::socket_t& socket, int size = 0);
std::optional<Message> doRecv(zmq::socket_t& socket, int size = 0);

Message recvBuffer(zmq::socket_t& socket, int size);
std::optional<Message> recvBuffer(zmq::socket_t& socket, int size);

Message recvNoBuffer(zmq::socket_t& socket);
std::optional<Message> recvNoBuffer(zmq::socket_t& socket);
};

class AsyncSendMessageEndpoint final : public MessageEndpoint
Expand Down Expand Up @@ -104,7 +105,7 @@ class RecvMessageEndpoint : public MessageEndpoint

virtual ~RecvMessageEndpoint(){};

virtual Message recv(int size = 0);
virtual std::optional<Message> recv(int size = 0);

protected:
zmq::socket_t socket;
Expand All @@ -116,7 +117,7 @@ class AsyncRecvMessageEndpoint final : public RecvMessageEndpoint
AsyncRecvMessageEndpoint(int portIn,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

Message recv(int size = 0) override;
std::optional<Message> recv(int size = 0) override;
};

class SyncRecvMessageEndpoint final : public RecvMessageEndpoint
Expand All @@ -125,7 +126,7 @@ class SyncRecvMessageEndpoint final : public RecvMessageEndpoint
SyncRecvMessageEndpoint(int portIn,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

Message recv(int size = 0) override;
std::optional<Message> recv(int size = 0) override;

void sendResponse(const uint8_t* data, int size);
};
Expand Down
11 changes: 9 additions & 2 deletions src/endpoint/Endpoint.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
#include <faabric/endpoint/Endpoint.h>
#include <faabric/util/logging.h>
#include <faabric/util/timing.h>

#include <pistache/endpoint.h>
#include <pistache/listener.h>
#include <signal.h>

namespace faabric::endpoint {
Endpoint::Endpoint()
: Endpoint(faabric::util::getSystemConfig().endpointPort,
faabric::util::getSystemConfig().endpointNumThreads)
{}

Endpoint::Endpoint(int portIn, int threadCountIn)
: port(portIn)
, threadCount(threadCountIn)
, httpEndpoint(Pistache::Address(Pistache::Ipv4::any(), Pistache::Port(port)))
, httpEndpoint(
Pistache::Address(Pistache::Ipv4::any(), Pistache::Port(portIn)))
{}

void Endpoint::start(bool awaitSignal)
{
SPDLOG_INFO("Starting HTTP endpoint on {}", port);
SPDLOG_INFO("Starting HTTP endpoint on {}, {} threads", port, threadCount);

// Set up signal handler
sigset_t signals;
Expand Down
5 changes: 5 additions & 0 deletions src/endpoint/FaabricEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ std::pair<int, std::string> FaabricEndpointHandler::executeFunction(
faabric::util::setMessageId(msg);
std::string thisHost = faabric::util::getSystemConfig().endpointHost;
msg.set_masterhost(thisHost);
// This is set to false by the scheduler if the function ends up being sent
// elsewhere
if (!msg.isasync()) {
msg.set_executeslocally(true);
}

auto tid = (pid_t)syscall(SYS_gettid);
const std::string funcStr = faabric::util::funcToString(msg, true);
Expand Down
11 changes: 6 additions & 5 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ message Message {

int64 timestamp = 14;
string resultKey = 15;
string statusKey = 16;
bool executesLocally = 16;
string statusKey = 17;

string executedHost = 17;
int64 finishTimestamp = 18;
string executedHost = 18;
int64 finishTimestamp = 19;

bool isAsync = 19;
bool isPython = 20;
bool isAsync = 20;
bool isPython = 21;
bool isStatusRequest = 22;
bool isExecGraphRequest = 23;

Expand Down
37 changes: 29 additions & 8 deletions src/redis/Redis.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include <faabric/redis/Redis.h>

#include <faabric/util/bytes.h>
#include <faabric/util/config.h>
#include <faabric/util/gids.h>
#include <faabric/util/logging.h>
#include <faabric/util/network.h>
#include <faabric/util/random.h>

#include <faabric/util/bytes.h>
#include <faabric/util/gids.h>
#include <faabric/util/timing.h>
#include <thread>

namespace faabric::redis {
Expand All @@ -27,26 +28,27 @@ RedisInstance::RedisInstance(RedisRole roleIn)
port = std::stoi(portStr);

// Load scripts
if (delifeqSha.empty()) {
if (delifeqSha.empty() || schedPublishSha.empty()) {
std::unique_lock<std::mutex> lock(scriptsLock);

if (delifeqSha.empty()) {
if (delifeqSha.empty() || schedPublishSha.empty()) {
printf("Loading scripts for Redis instance at %s\n",
hostname.c_str());
redisContext* context = redisConnect(ip.c_str(), port);

delifeqSha = this->loadScript(context, delifeqCmd);
schedPublishSha = this->loadScript(context, schedPublishCmd);

redisFree(context);
}
}
}

std::string RedisInstance::loadScript(redisContext* context,
const std::string& scriptBody)
const std::string_view scriptBody)
{
auto reply =
(redisReply*)redisCommand(context, "SCRIPT LOAD %s", scriptBody.c_str());
auto reply = (redisReply*)redisCommand(
context, "SCRIPT LOAD %b", scriptBody.data(), scriptBody.size());

if (reply == nullptr) {
throw std::runtime_error("Error loading script from Redis");
Expand Down Expand Up @@ -774,4 +776,23 @@ void Redis::dequeueBytes(const std::string& queueName,

freeReplyObject(reply);
}

void Redis::publishSchedulerResult(const std::string& key,
const std::string& status_key,
const std::vector<uint8_t>& result)
{
auto reply = (redisReply*)redisCommand(context,
"EVALSHA %s 2 %s %s %b %d %d",
instance.schedPublishSha.c_str(),
// keys
key.c_str(),
status_key.c_str(),
// argv
result.data(),
result.size(),
RESULT_KEY_EXPIRY,
STATUS_KEY_EXPIRY);
extractScriptResult(reply);
}

}
2 changes: 1 addition & 1 deletion src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ faabric::MpiHostsToRanksMessage MpiWorld::recvMpiHostRankMsg()
}

SPDLOG_TRACE("Receiving MPI host ranks on {}", basePort);
faabric::transport::Message m = ranksRecvEndpoint->recv();
faabric::transport::Message m = ranksRecvEndpoint->recv().value();
PARSE_MSG(faabric::MpiHostsToRanksMessage, m.data(), m.size());

return msg;
Expand Down
Loading

0 comments on commit 94aae8c

Please sign in to comment.