Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latency fixes #135

Merged
merged 7 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/faabric/endpoint/Endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace faabric::endpoint {
class Endpoint
{
public:
Endpoint() = default;
Endpoint();

Endpoint(int port, int threadCount);

Expand Down
37 changes: 30 additions & 7 deletions include/faabric/redis/Redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <mutex>
#include <set>
#include <string>
#include <string_view>
#include <thread>
#include <unordered_set>
#include <vector>
Expand All @@ -24,6 +25,7 @@ class RedisInstance
explicit RedisInstance(RedisRole role);

std::string delifeqSha;
std::string schedPublishSha;

std::string ip;
std::string hostname;
Expand All @@ -35,15 +37,31 @@ class RedisInstance
std::mutex scriptsLock;

std::string loadScript(redisContext* context,
const std::string& scriptBody);
const std::string_view scriptBody);

// Script to delete a key if it equals a given value
const std::string delifeqCmd =
"if redis.call(\"GET\", KEYS[1]) == ARGV[1] then \n"
" return redis.call(\"DEL\", KEYS[1]) \n"
"else \n"
" return 0 \n"
"end";
const std::string_view delifeqCmd = R"---(
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
)---";

// Script to push and expire function execution results avoiding extra
// copies and round-trips
const std::string_view schedPublishCmd = R"---(
local key = KEYS[1]
local status_key = KEYS[2]
local result = ARGV[1]
local result_expiry = tonumber(ARGV[2])
local status_expiry = tonumber(ARGV[3])
redis.call('RPUSH', key, result)
redis.call('EXPIRE', key, result_expiry)
redis.call('SET', status_key, result)
redis.call('EXPIRE', status_key, status_expiry)
return 0
)---";
};

class Redis
Expand Down Expand Up @@ -181,6 +199,11 @@ class Redis
long buffLen,
long nElems);

// Scheduler result publish
void publishSchedulerResult(const std::string& key,
const std::string& status_key,
const std::vector<uint8_t>& result);

private:
explicit Redis(const RedisInstance& instance);

Expand Down
10 changes: 9 additions & 1 deletion include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,15 @@ class Scheduler
const std::string& otherHost);

faabric::HostResources thisHostResources;
std::atomic<int32_t> thisHostUsedSlots;
std::set<std::string> availableHostsCache;
std::unordered_map<std::string, std::set<std::string>> registeredHosts;

std::unordered_map<uint32_t,
std::promise<std::unique_ptr<faabric::Message>>>
localResults;
std::mutex localResultsMutex;

std::vector<faabric::Message> recordedMessagesAll;
std::vector<faabric::Message> recordedMessagesLocal;
std::vector<std::pair<std::string, faabric::Message>>
Expand All @@ -214,7 +220,9 @@ class Scheduler
std::vector<std::string> getUnregisteredHosts(const std::string& funcStr,
bool noCache = false);

std::shared_ptr<Executor> claimExecutor(faabric::Message& msg);
std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
faabric::util::FullLock& schedulerLock);

faabric::HostResources getHostResources(const std::string& host);

Expand Down
13 changes: 7 additions & 6 deletions include/faabric/transport/MessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <faabric/transport/Message.h>
#include <faabric/util/exception.h>

#include <optional>
#include <thread>
#include <zmq.hpp>

Expand Down Expand Up @@ -56,11 +57,11 @@ class MessageEndpoint
size_t dataSize,
bool more);

Message doRecv(zmq::socket_t& socket, int size = 0);
std::optional<Message> doRecv(zmq::socket_t& socket, int size = 0);

Message recvBuffer(zmq::socket_t& socket, int size);
std::optional<Message> recvBuffer(zmq::socket_t& socket, int size);

Message recvNoBuffer(zmq::socket_t& socket);
std::optional<Message> recvNoBuffer(zmq::socket_t& socket);
};

class AsyncSendMessageEndpoint final : public MessageEndpoint
Expand Down Expand Up @@ -104,7 +105,7 @@ class RecvMessageEndpoint : public MessageEndpoint

virtual ~RecvMessageEndpoint(){};

virtual Message recv(int size = 0);
virtual std::optional<Message> recv(int size = 0);

protected:
zmq::socket_t socket;
Expand All @@ -116,7 +117,7 @@ class AsyncRecvMessageEndpoint final : public RecvMessageEndpoint
AsyncRecvMessageEndpoint(int portIn,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

Message recv(int size = 0) override;
std::optional<Message> recv(int size = 0) override;
};

class SyncRecvMessageEndpoint final : public RecvMessageEndpoint
Expand All @@ -125,7 +126,7 @@ class SyncRecvMessageEndpoint final : public RecvMessageEndpoint
SyncRecvMessageEndpoint(int portIn,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

Message recv(int size = 0) override;
std::optional<Message> recv(int size = 0) override;

void sendResponse(const uint8_t* data, int size);
};
Expand Down
11 changes: 9 additions & 2 deletions src/endpoint/Endpoint.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
#include <faabric/endpoint/Endpoint.h>
#include <faabric/util/logging.h>
#include <faabric/util/timing.h>

#include <pistache/endpoint.h>
#include <pistache/listener.h>
#include <signal.h>

namespace faabric::endpoint {
Endpoint::Endpoint()
: Endpoint(faabric::util::getSystemConfig().endpointPort,
faabric::util::getSystemConfig().endpointNumThreads)
{}

Endpoint::Endpoint(int portIn, int threadCountIn)
: port(portIn)
, threadCount(threadCountIn)
, httpEndpoint(Pistache::Address(Pistache::Ipv4::any(), Pistache::Port(port)))
, httpEndpoint(
Pistache::Address(Pistache::Ipv4::any(), Pistache::Port(portIn)))
{}

void Endpoint::start(bool awaitSignal)
{
SPDLOG_INFO("Starting HTTP endpoint on {}", port);
SPDLOG_INFO("Starting HTTP endpoint on {}, {} threads", port, threadCount);

// Set up signal handler
sigset_t signals;
Expand Down
5 changes: 5 additions & 0 deletions src/endpoint/FaabricEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ std::pair<int, std::string> FaabricEndpointHandler::executeFunction(
faabric::util::setMessageId(msg);
std::string thisHost = faabric::util::getSystemConfig().endpointHost;
msg.set_masterhost(thisHost);
// This is set to false by the scheduler if the function ends up being sent
// elsewhere
if (!msg.isasync()) {
msg.set_executeslocally(true);
}

auto tid = (pid_t)syscall(SYS_gettid);
const std::string funcStr = faabric::util::funcToString(msg, true);
Expand Down
11 changes: 6 additions & 5 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ message Message {

int64 timestamp = 14;
string resultKey = 15;
string statusKey = 16;
bool executesLocally = 16;
string statusKey = 17;

string executedHost = 17;
int64 finishTimestamp = 18;
string executedHost = 18;
int64 finishTimestamp = 19;

bool isAsync = 19;
bool isPython = 20;
bool isAsync = 20;
bool isPython = 21;
bool isStatusRequest = 22;
bool isExecGraphRequest = 23;

Expand Down
37 changes: 29 additions & 8 deletions src/redis/Redis.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include <faabric/redis/Redis.h>

#include <faabric/util/bytes.h>
#include <faabric/util/config.h>
#include <faabric/util/gids.h>
#include <faabric/util/logging.h>
#include <faabric/util/network.h>
#include <faabric/util/random.h>

#include <faabric/util/bytes.h>
#include <faabric/util/gids.h>
#include <faabric/util/timing.h>
#include <thread>

namespace faabric::redis {
Expand All @@ -27,26 +28,27 @@ RedisInstance::RedisInstance(RedisRole roleIn)
port = std::stoi(portStr);

// Load scripts
if (delifeqSha.empty()) {
if (delifeqSha.empty() || schedPublishSha.empty()) {
std::unique_lock<std::mutex> lock(scriptsLock);

if (delifeqSha.empty()) {
if (delifeqSha.empty() || schedPublishSha.empty()) {
printf("Loading scripts for Redis instance at %s\n",
hostname.c_str());
redisContext* context = redisConnect(ip.c_str(), port);

delifeqSha = this->loadScript(context, delifeqCmd);
schedPublishSha = this->loadScript(context, schedPublishCmd);

redisFree(context);
}
}
}

std::string RedisInstance::loadScript(redisContext* context,
const std::string& scriptBody)
const std::string_view scriptBody)
{
auto reply =
(redisReply*)redisCommand(context, "SCRIPT LOAD %s", scriptBody.c_str());
auto reply = (redisReply*)redisCommand(
context, "SCRIPT LOAD %b", scriptBody.data(), scriptBody.size());

if (reply == nullptr) {
throw std::runtime_error("Error loading script from Redis");
Expand Down Expand Up @@ -774,4 +776,23 @@ void Redis::dequeueBytes(const std::string& queueName,

freeReplyObject(reply);
}

void Redis::publishSchedulerResult(const std::string& key,
const std::string& status_key,
const std::vector<uint8_t>& result)
{
auto reply = (redisReply*)redisCommand(context,
"EVALSHA %s 2 %s %s %b %d %d",
instance.schedPublishSha.c_str(),
// keys
key.c_str(),
status_key.c_str(),
// argv
result.data(),
result.size(),
RESULT_KEY_EXPIRY,
STATUS_KEY_EXPIRY);
extractScriptResult(reply);
}

}
2 changes: 1 addition & 1 deletion src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ faabric::MpiHostsToRanksMessage MpiWorld::recvMpiHostRankMsg()
}

SPDLOG_TRACE("Receiving MPI host ranks on {}", basePort);
faabric::transport::Message m = ranksRecvEndpoint->recv();
faabric::transport::Message m = ranksRecvEndpoint->recv().value();
PARSE_MSG(faabric::MpiHostsToRanksMessage, m.data(), m.size());

return msg;
Expand Down
Loading