Skip to content

Commit

Permalink
Return error on failed endpoint request (#132)
Browse files Browse the repository at this point in the history
* Return an error on failed endpoint request

* Hook up endpoint test

* Endpoint test

* Finish endpoint test

* Fix endpoint API test
  • Loading branch information
Shillaker committed Aug 12, 2021
1 parent 6a25257 commit 3392368
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 78 deletions.
6 changes: 5 additions & 1 deletion include/faabric/endpoint/Endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ class Endpoint

Endpoint(int port, int threadCount);

void start();
void start(bool awaitSignal = true);

void stop();

virtual std::shared_ptr<Pistache::Http::Handler> getHandler() = 0;

private:
int port = faabric::util::getSystemConfig().endpointPort;
int threadCount = faabric::util::getSystemConfig().endpointNumThreads;

Pistache::Http::Endpoint httpEndpoint;
};
}
4 changes: 2 additions & 2 deletions include/faabric/endpoint/FaabricEndpointHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class FaabricEndpointHandler : public Pistache::Http::Handler
void onRequest(const Pistache::Http::Request& request,
Pistache::Http::ResponseWriter response) override;

std::string handleFunction(const std::string& requestStr);
std::pair<int, std::string> handleFunction(const std::string& requestStr);

private:
std::string executeFunction(faabric::Message& msg);
std::pair<int, std::string> executeFunction(faabric::Message& msg);
};
}
2 changes: 0 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ class Scheduler

void flushLocally();

std::string getMessageStatus(unsigned int messageId);

void setFunctionResult(faabric::Message& msg);

faabric::Message getFunctionResult(unsigned int messageId, int timeout);
Expand Down
46 changes: 28 additions & 18 deletions src/endpoint/Endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,47 +9,57 @@ namespace faabric::endpoint {
Endpoint::Endpoint(int portIn, int threadCountIn)
: port(portIn)
, threadCount(threadCountIn)
, httpEndpoint(Pistache::Address(Pistache::Ipv4::any(), Pistache::Port(port)))
{}

void Endpoint::start()
void Endpoint::start(bool awaitSignal)
{
SPDLOG_INFO("Starting HTTP endpoint");
SPDLOG_INFO("Starting HTTP endpoint on {}", port);

// Set up signal handler
sigset_t signals;
if (sigemptyset(&signals) != 0 || sigaddset(&signals, SIGTERM) != 0 ||
sigaddset(&signals, SIGKILL) != 0 || sigaddset(&signals, SIGINT) != 0 ||
sigaddset(&signals, SIGHUP) != 0 || sigaddset(&signals, SIGQUIT) != 0 ||
pthread_sigmask(SIG_BLOCK, &signals, nullptr) != 0) {
if (awaitSignal) {
if (sigemptyset(&signals) != 0 || sigaddset(&signals, SIGTERM) != 0 ||
sigaddset(&signals, SIGKILL) != 0 ||
sigaddset(&signals, SIGINT) != 0 ||
sigaddset(&signals, SIGHUP) != 0 ||
sigaddset(&signals, SIGQUIT) != 0 ||
pthread_sigmask(SIG_BLOCK, &signals, nullptr) != 0) {

throw std::runtime_error("Install signal handler failed");
throw std::runtime_error("Install signal handler failed");
}
}

Pistache::Address addr(Pistache::Ipv4::any(), Pistache::Port(this->port));

// Configure endpoint
auto opts = Pistache::Http::Endpoint::options()
.threads(threadCount)
.backlog(256)
.flags(Pistache::Tcp::Options::ReuseAddr);

Pistache::Http::Endpoint httpEndpoint(addr);
httpEndpoint.init(opts);

// Configure and start endpoint
httpEndpoint.setHandler(this->getHandler());
httpEndpoint.serveThreaded();

// Wait for a signal
SPDLOG_INFO("Awaiting signal");
int signal = 0;
int status = sigwait(&signals, &signal);
if (status == 0) {
SPDLOG_INFO("Received signal: {}", signal);
} else {
SPDLOG_INFO("Sigwait return value: {}", signal);
if (awaitSignal) {
// Wait for a signal
SPDLOG_INFO("Awaiting signal");
int signal = 0;
int status = sigwait(&signals, &signal);
if (status == 0) {
SPDLOG_INFO("Received signal: {}", signal);
} else {
SPDLOG_INFO("Sigwait return value: {}", signal);
}

httpEndpoint.shutdown();
}
}

void Endpoint::stop()
{
SPDLOG_INFO("Shutting down endpoint on {}", port);
httpEndpoint.shutdown();
}
}
76 changes: 49 additions & 27 deletions src/endpoint/FaabricEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,50 +40,70 @@ void FaabricEndpointHandler::onRequest(const Pistache::Http::Request& request,

// Parse message from JSON in request
const std::string requestStr = request.body();
std::string responseStr = handleFunction(requestStr);
std::pair<int, std::string> result = handleFunction(requestStr);

PROF_END(endpointRoundTrip)
response.send(Pistache::Http::Code::Ok, responseStr);
Pistache::Http::Code responseCode = Pistache::Http::Code::Ok;
if (result.first > 0) {
responseCode = Pistache::Http::Code::Internal_Server_Error;
}
response.send(responseCode, result.second);
}

std::string FaabricEndpointHandler::handleFunction(
std::pair<int, std::string> FaabricEndpointHandler::handleFunction(
const std::string& requestStr)
{
std::string responseStr;
std::pair<int, std::string> response;
if (requestStr.empty()) {
responseStr = "Empty request";
SPDLOG_ERROR("Faabric handler received empty request");
response = std::make_pair(1, "Empty request");
} else {
faabric::Message msg = faabric::util::jsonToMessage(requestStr);
faabric::scheduler::Scheduler& sched =
faabric::scheduler::getScheduler();

if (msg.isstatusrequest()) {
responseStr = sched.getMessageStatus(msg.id());
SPDLOG_DEBUG("Processing status request");
const faabric::Message result =
sched.getFunctionResult(msg.id(), 0);

if (result.type() == faabric::Message_MessageType_EMPTY) {
response = std::make_pair(0, "RUNNING");
} else if (result.returnvalue() == 0) {
response = std::make_pair(0, "SUCCESS: " + result.outputdata());
} else {
response = std::make_pair(1, "FAILED: " + result.outputdata());
}
} else if (msg.isexecgraphrequest()) {
SPDLOG_DEBUG("Processing execution graph request");
faabric::scheduler::ExecGraph execGraph =
sched.getFunctionExecGraph(msg.id());
responseStr = faabric::scheduler::execGraphToJson(execGraph);
response =
std::make_pair(0, faabric::scheduler::execGraphToJson(execGraph));

} else if (msg.type() == faabric::Message_MessageType_FLUSH) {
SPDLOG_DEBUG("Broadcasting flush request");
sched.broadcastFlush();
response = std::make_pair(0, "Flush sent");
} else {
responseStr = executeFunction(msg);
response = executeFunction(msg);
}
}

return responseStr;
return response;
}

std::string FaabricEndpointHandler::executeFunction(faabric::Message& msg)
std::pair<int, std::string> FaabricEndpointHandler::executeFunction(
faabric::Message& msg)
{
faabric::util::SystemConfig& conf = faabric::util::getSystemConfig();

if (msg.user().empty()) {
return "Empty user";
} else if (msg.function().empty()) {
return "Empty function";
return std::make_pair(1, "Empty user");
}

if (msg.function().empty()) {
return std::make_pair(1, "Empty function");
}

// Set message ID and master host
Expand All @@ -101,23 +121,25 @@ std::string FaabricEndpointHandler::executeFunction(faabric::Message& msg)

// Await result on global bus (may have been executed on a different worker)
if (msg.isasync()) {
return faabric::util::buildAsyncResponse(msg);
} else {
SPDLOG_DEBUG("Worker thread {} awaiting {}", tid, funcStr);
return std::make_pair(0, faabric::util::buildAsyncResponse(msg));
}

try {
const faabric::Message result =
sch.getFunctionResult(msg.id(), conf.globalMessageTimeout);
SPDLOG_DEBUG("Worker thread {} result {}", tid, funcStr);
SPDLOG_DEBUG("Worker thread {} awaiting {}", tid, funcStr);

if (result.sgxresult().empty()) {
return result.outputdata() + "\n";
} else {
return faabric::util::getJsonOutput(result);
}
} catch (faabric::redis::RedisNoResponseException& ex) {
return "No response from function\n";
try {
const faabric::Message result =
sch.getFunctionResult(msg.id(), conf.globalMessageTimeout);
SPDLOG_DEBUG("Worker thread {} result {}", tid, funcStr);

if (result.sgxresult().empty()) {
return std::make_pair(result.returnvalue(),
result.outputdata() + "\n");
}

return std::make_pair(result.returnvalue(),
faabric::util::getJsonOutput(result));
} catch (faabric::redis::RedisNoResponseException& ex) {
return std::make_pair(1, "No response from function\n");
}
}
}
13 changes: 0 additions & 13 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,19 +784,6 @@ faabric::Message Scheduler::getFunctionResult(unsigned int messageId,
return msgResult;
}

std::string Scheduler::getMessageStatus(unsigned int messageId)
{
const faabric::Message result = getFunctionResult(messageId, 0);

if (result.type() == faabric::Message_MessageType_EMPTY) {
return "RUNNING";
} else if (result.returnvalue() == 0) {
return "SUCCESS: " + result.outputdata();
} else {
return "FAILED: " + result.outputdata();
}
}

faabric::HostResources Scheduler::getThisHostResources()
{
return thisHostResources;
Expand Down
Loading

0 comments on commit 3392368

Please sign in to comment.