Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Donald ug #1

Open
wants to merge 85 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
41ab5ba
patched shutting down thread on function result
Feb 2, 2024
c157bce
added debug
Feb 2, 2024
5986f5d
Adds logging to the weird non-STL Queue
Feb 2, 2024
546c3f0
adds exception handling to setFunctionResult
Feb 6, 2024
e403138
attempts to fix funky delta
Feb 6, 2024
b05e6a3
refactors systems metrics to faabric scheduler
DonaldJennings Feb 11, 2024
f4492c3
updates CMAKELists for faabric::util library
DonaldJennings Feb 11, 2024
e83ca1e
removes pragma once from cpp
DonaldJennings Feb 11, 2024
62c2e2f
removes pragma once from cpp
DonaldJennings Feb 11, 2024
f7b92bb
moves definition of system_metrics to header file
DonaldJennings Feb 11, 2024
d5770f9
fixes function signature for MemStats
DonaldJennings Feb 11, 2024
425d270
attempts to add metrics requests
DonaldJennings Feb 13, 2024
283038b
updated scheduler to shutdown on crashing
DonaldJennings Feb 13, 2024
609a47c
added exception handling to faabric endpoint
DonaldJennings Feb 13, 2024
ff79673
added entry-point debug statements
DonaldJennings Feb 14, 2024
540abf8
Fix debug log statement in Scheduler.cpp
DonaldJennings Feb 14, 2024
3ffd4d6
Fix host scheduling logic and add debug logging
DonaldJennings Feb 14, 2024
6dd73dd
Add array file association for cpp files and handle exceptions in Faa…
DonaldJennings Feb 14, 2024
f2f0184
Refactor FaabricEndpointHandler and Scheduler
DonaldJennings Feb 14, 2024
fefeaa8
Add load balancing policies and handle exceptions in FaabricEndpointH…
DonaldJennings Feb 14, 2024
3f014dc
Remove isMetricsRequest field from MessageRecord and faabric.proto
DonaldJennings Feb 14, 2024
02d7472
Refactored the faasm round-robin implementation to a policy class
DonaldJennings Feb 14, 2024
f6cef20
fixes removed comma
DonaldJennings Feb 14, 2024
eaeb790
added include directo
DonaldJennings Feb 14, 2024
21f87e1
Adds CMAKE to loadbalance
DonaldJennings Feb 14, 2024
0376aa4
Update LoadBalancePolicy dispatch method signature
DonaldJennings Feb 14, 2024
a268b3e
Update load balance policies to return vector of strings instead of a…
DonaldJennings Feb 14, 2024
1794bcf
Update LoadBalancePolicy dispatch method signatures
DonaldJennings Feb 14, 2024
5c914d2
fixed wrong method call
DonaldJennings Feb 14, 2024
faf639d
Refactor code for improved performance and readability
DonaldJennings Feb 15, 2024
1a82719
Refactor load balancing and scheduling code
DonaldJennings Feb 15, 2024
cdb0e70
Refactor load balance policy dispatch methods to use std::map
DonaldJennings Feb 15, 2024
efbec43
fix
DonaldJennings Feb 15, 2024
343c59f
Update LoadBalancePolicy dispatch function signatures
DonaldJennings Feb 15, 2024
e5a3645
Update scheduler policy to MostSlotsPolicy
DonaldJennings Feb 15, 2024
5ce849a
lol
DonaldJennings Feb 15, 2024
cae87bc
tings working
DonaldJennings Feb 15, 2024
17e1354
Refactor scheduling policy in Scheduler.cpp
DonaldJennings Feb 15, 2024
033b89e
changes to MostSlots policy
DonaldJennings Feb 15, 2024
6cab51e
changes to MostSlots
DonaldJennings Feb 15, 2024
c2f5c6a
removed print to SPDLOG_INFO2
DonaldJennings Feb 15, 2024
6cb947a
Reorder registered hosts based on LoadBalancePolicy in Scheduler.cpp
DonaldJennings Feb 15, 2024
5f44613
Refactor load balancing and scheduling
DonaldJennings Feb 15, 2024
3839984
Add print statement to display the size of the registered hosts map
DonaldJennings Feb 15, 2024
2ab51ba
Refactor load balancing policies to use vectors instead of maps
DonaldJennings Feb 15, 2024
cca1fe3
Refactor host resources handling in Scheduler.cpp
DonaldJennings Feb 15, 2024
967ab3b
Refactor host sorting in Scheduler.cpp
DonaldJennings Feb 15, 2024
2dbc189
Refactor host iteration in Scheduler.cpp
DonaldJennings Feb 15, 2024
ea16c5f
removed print to SPDLOG
DonaldJennings Feb 15, 2024
c7261a1
Refactor host registration in Scheduler.cpp
DonaldJennings Feb 15, 2024
880918c
Update scheduler policy to FaasmDefaultPolicy
DonaldJennings Feb 15, 2024
2404a74
Remove unnecessary shutdown call and add debug logging for registered…
DonaldJennings Feb 15, 2024
b7cd62b
Fix resource calculation in Scheduler.cpp
DonaldJennings Feb 15, 2024
0cfd96e
test
DonaldJennings Feb 15, 2024
eff1f10
Fix incorrect variable name in Scheduler.cpp
DonaldJennings Feb 15, 2024
3e4c246
Refactor host registration and scheduling
DonaldJennings Feb 15, 2024
7920f41
Fix bug in login functionality***
DonaldJennings Feb 15, 2024
96db3fa
added policy decisions to registered function decision making
DonaldJennings Feb 15, 2024
06e80c7
Apply load balancing policy to scheduler
DonaldJennings Feb 15, 2024
2c9a58f
fixed importation errors
DonaldJennings Feb 15, 2024
fd1b21a
removed redundant parameters from the applyLoadBalancedPolicy
DonaldJennings Feb 15, 2024
7a41234
Add FaasmDefaultPolicy initialization in Scheduler constructor
DonaldJennings Feb 15, 2024
3ea3a66
Remove unused header and initialize policy in Scheduler constructor
DonaldJennings Feb 15, 2024
1853151
Refactor load balancing policy in Scheduler class
DonaldJennings Feb 15, 2024
ab57391
Update Scheduler class to use vector instead of set in applyLoadBalan…
DonaldJennings Feb 15, 2024
8946708
Refactor dispatch method in Scheduler.cpp
DonaldJennings Feb 15, 2024
9fa968a
Update user authentication logic
DonaldJennings Feb 15, 2024
f041611
Refactor host balancing in Scheduler.cpp
DonaldJennings Feb 15, 2024
ed57c4f
Update load balancing policy to MostSlotsPolicy
DonaldJennings Feb 15, 2024
ebecda1
Refactor scheduling logic and apply FaasmDefaultPolicy***
DonaldJennings Feb 15, 2024
b63ebd1
Add load balancing policy and thresholds to SystemConfig
DonaldJennings Feb 16, 2024
cb2fcf3
Refactor load balancing policy in Scheduler.cpp
DonaldJennings Feb 16, 2024
54b0b99
Refactor load balancing policy in Scheduler.cpp
DonaldJennings Feb 16, 2024
a6fa856
added default policy to exception handling
DonaldJennings Feb 16, 2024
13d1b5b
Add debug logs for load balancing policies
DonaldJennings Feb 19, 2024
57170ad
Remove metrics collection module from SystemConfig print() function
DonaldJennings Feb 19, 2024
0cde3d6
Add load average to HostResources message and update Scheduler class
DonaldJennings Feb 19, 2024
14b241b
Add getLoadAverage() function to system metrics
DonaldJennings Feb 19, 2024
3ad6e8c
Add pragma once directive to system_metrics.h
DonaldJennings Feb 19, 2024
ef1b2d0
Refactor load average calculation in Scheduler.cpp
DonaldJennings Feb 19, 2024
59756dc
Implement least load average policy for load balancing
DonaldJennings Feb 19, 2024
5d70122
Remove unused policy and update function signature in Scheduler.cpp
DonaldJennings Feb 19, 2024
c14f513
Add debug log statements to get registered and available hosts
DonaldJennings Feb 19, 2024
f0ed961
Add debug log for acquiring lock in getFunctionRegisteredHosts() func…
DonaldJennings Feb 19, 2024
5e63c7d
Fix acquiring lock for registered hosts in Scheduler.cpp
DonaldJennings Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
<<<<<<< HEAD
"editor.tokenColorCustomizations": {
"[*Light*]": {
"textMateRules": [
{
"scope": "ref.matchtext",
"settings": {
"foreground": "#000"
}
}
]
},
"[*Dark*]": {
"textMateRules": [
{
"scope": "ref.matchtext",
"settings": {
"foreground": "#fff"
}
}
]
},
"textMateRules": [
{
"scope": "googletest.failed",
"settings": {
"foreground": "#f00"
}
},
{
"scope": "googletest.passed",
"settings": {
"foreground": "#0f0"
}
},
{
"scope": "googletest.run",
"settings": {
"foreground": "#0f0"
}
}
]
=======
"files.associations": {
"array": "cpp"
>>>>>>> f2f01849eae98603aa3f1223f9af265743b91e5d
}
}
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ add_subdirectory(src/mpi)
add_subdirectory(src/proto)
add_subdirectory(src/redis)
add_subdirectory(src/runner)
add_subdirectory(src/loadbalance)
add_subdirectory(src/scheduler)
add_subdirectory(src/snapshot)
add_subdirectory(src/state)
Expand All @@ -130,6 +131,7 @@ add_library(faabric
$<TARGET_OBJECTS:scheduler_obj>
$<TARGET_OBJECTS:snapshot_obj>
$<TARGET_OBJECTS:state_obj>
$<TARGET_OBJECTS:loadbalance_obj>
$<TARGET_OBJECTS:transport_obj>
$<TARGET_OBJECTS:util_obj>
)
Expand Down
30 changes: 30 additions & 0 deletions include/faabric/loadbalance/LoadBalancePolicy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include <set>
#include <string>
#include <vector>
#include <faabric/scheduler/Scheduler.h>

class LoadBalancePolicy
{
public:
virtual std::vector<std::pair<std::string, faabric::HostResources>> dispatch(std::vector<std::pair<std::string, faabric::HostResources>>& host_resources) = 0;
};

class FaasmDefaultPolicy : public LoadBalancePolicy
{
public:
std::vector<std::pair<std::string, faabric::HostResources>> dispatch(std::vector<std::pair<std::string, faabric::HostResources>>& host_resources) override;
};

class LeastLoadAveragePolicy : public LoadBalancePolicy
{
public:
std::vector<std::pair<std::string, faabric::HostResources>> dispatch(std::vector<std::pair<std::string, faabric::HostResources>>& host_resources) override;
};

class MostSlotsPolicy : public LoadBalancePolicy
{
public:
std::vector<std::pair<std::string, faabric::HostResources>> dispatch(std::vector<std::pair<std::string, faabric::HostResources>>& host_resources) override;
};
9 changes: 8 additions & 1 deletion include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/transport/PointToPointBroker.h>
#include <faabric/util/PeriodicBackgroundThread.h>
#include <faabric/util/system_metrics.h>
#include <faabric/util/asio.h>
#include <faabric/util/clock.h>
#include <faabric/util/config.h>
Expand Down Expand Up @@ -277,7 +278,11 @@ class Scheduler

inline void setFunctionResult(const faabric::Message& msg)
{
setFunctionResult(std::make_unique<faabric::Message>(msg));
try {
setFunctionResult(std::make_unique<faabric::Message>(msg));
} catch (const std::exception& e) {
SPDLOG_ERROR("[Scheduler.h] Failed to set function result: {}", e.what());
}
}

faabric::Message getFunctionResult(unsigned int messageId,
Expand Down Expand Up @@ -462,6 +467,8 @@ class Scheduler
faabric::util::SchedulingTopologyHint topologyHint,
std::shared_ptr<void> extraData);

std::set<std::string> applyLoadBalancedPolicy(std::vector<std::string> hosts);

std::shared_ptr<Executor> claimExecutor(const faabric::MessageInBatch& msg);

std::vector<std::string> getUnregisteredHosts(const std::string& user,
Expand Down
5 changes: 5 additions & 0 deletions include/faabric/util/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class SystemConfig
bool isStorageNode;
int noSingleHostOptimisations;

std::string load_balance_policy;
double offload_cpu_threshold;
double offload_ram_threshold;
double offload_load_avg_threshold;

// Worker-related timeouts
int globalMessageTimeout;
int boundTimeout;
Expand Down
21 changes: 14 additions & 7 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,29 @@ class Queue
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
throw std::runtime_error("Invalid queue timeout");
}

while (mq.empty()) {

while (mq.size() == 0) {
SPDLOG_DEBUG("Queue is empty... waiting for dequeue");
std::cv_status returnVal = enqueueNotifier.wait_for(
lock, std::chrono::milliseconds(timeoutMs));

SPDLOG_DEBUG("Queue has been notified");

// Work out if this has returned due to timeout expiring
if (returnVal == std::cv_status::timeout) {
throw QueueTimeoutException("Timeout waiting for dequeue");
}
}

T value = std::move(mq.front());
mq.pop();
emptyNotifier.notify_one();

return value;
try {
T value = std::move(mq.front());
mq.pop();
emptyNotifier.notify_one();
return value;
} catch (std::exception& e) {
SPDLOG_ERROR("Caught exception when dequeueing: {}", e.what());
throw;
}
}

T* peek(long timeoutMs = 0)
Expand Down
37 changes: 37 additions & 0 deletions include/faabric/util/system_metrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include <cstdint>
#include <string>
#include <vector>
#include <fstream>
#include <sstream>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <chrono>

namespace faabric::util {
struct UtilisationStats
{
double cpu_utilisation;
double ram_utilisation;
double load_average;
};

struct CPUStats
{
long totalCpuTime;
long idleCpuTime;
};

struct MemStats
{
uint64_t total;
uint64_t available;
};

UtilisationStats getSystemUtilisation();
CPUStats getCPUUtilisation();
double getMemoryUtilisation();
double getLoadAverage();
}
22 changes: 21 additions & 1 deletion src/endpoint/FaabricEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,27 @@ class HttpConnection : public std::enable_shared_from_this<HttpConnection>
stream.get_executor(),
std::bind_front(&HttpConnection::sendResponse,
this->shared_from_this()) };
handler->onRequest(std::move(hrc), std::move(msg));
try {
handler->onRequest(std::move(hrc), std::move(msg));
} catch (std::exception& e) {
SPDLOG_ERROR("Error handling HTTP request: {}", e.what());
faabric::util::BeastHttpResponse response;
response.result(beast::http::status::internal_server_error);
response.body() = e.what();
sendResponse(std::move(response));
} catch (boost::system::system_error& e) {
SPDLOG_ERROR("Error handling HTTP request: {}", e.what());
faabric::util::BeastHttpResponse response;
response.result(beast::http::status::internal_server_error);
response.body() = e.what();
sendResponse(std::move(response));
} catch(...) {
SPDLOG_ERROR("Error handling HTTP request: unknown exception");
faabric::util::BeastHttpResponse response;
response.result(beast::http::status::internal_server_error);
response.body() = "Unknown error";
sendResponse(std::move(response));
}
}

void onRead(beast::error_code ec, size_t bytesTransferred)
Expand Down
21 changes: 19 additions & 2 deletions src/endpoint/FaabricEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,22 @@ void FaabricEndpointHandler::onRequest(
response.result(beast::http::status::ok);
response.body() = std::string("Flush sent");
} else {
executeFunction(
std::move(ctx), std::move(response), std::move(msg));
try
{
executeFunction(std::move(ctx), std::move(response), std::move(msg));
}
catch (const std::exception& e)
{
SPDLOG_ERROR("Caught exception in FaabricEndpointHandler::onRequest: {}", e.what());
response.result(beast::http::status::internal_server_error);
response.body() = std::string("Caught exception: ") + e.what();
ctx.sendFunction(std::move(response));
} catch (faabric::util::FaabricException& e) {
SPDLOG_ERROR("Caught FaabricException in FaabricEndpointHandler::onRequest: {}", e.what());
response.result(beast::http::status::internal_server_error);
response.body() = std::string("Caught exception: ") + e.what();
ctx.sendFunction(std::move(response));
}
return;
}
}
Expand Down Expand Up @@ -174,7 +188,10 @@ void FaabricEndpointHandler::onFunctionResult(
faabric::util::funcToString(result, true));

response.body() = result.outputdata();
SPDLOG_DEBUG("Worker thread {} sending response", gettid());
return ctx.sendFunction(std::move(response));
SPDLOG_DEBUG("Worker thread {} response sent", gettid());
// We're done with this request
}

}
7 changes: 7 additions & 0 deletions src/loadbalance/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
faabric_lib(loadbalance
FaasmDefaultPolicy.cpp
LeastLoadAveragePolicy.cpp
MostSlotsPolicy.cpp)


target_link_libraries(loadbalance PRIVATE faabric::scheduler)
7 changes: 7 additions & 0 deletions src/loadbalance/FaasmDefaultPolicy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#include <faabric/loadbalance/LoadBalancePolicy.h>
#include <stdexcept>

std::vector<std::pair<std::string, faabric::HostResources>> FaasmDefaultPolicy::dispatch(std::vector<std::pair<std::string, faabric::HostResources>>& host_resources)
{
return host_resources;
}
12 changes: 12 additions & 0 deletions src/loadbalance/LeastLoadAveragePolicy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#include <faabric/loadbalance/LoadBalancePolicy.h>
#include <stdexcept>

std::vector<std::pair<std::string, faabric::HostResources>> LeastLoadAveragePolicy::dispatch(std::vector<std::pair<std::string, faabric::HostResources>>& host_resources)
{
// Sort the vector by the load average in ascending order
std::sort(host_resources.begin(), host_resources.end(), [](const auto &a, const auto &b) {
return a.second.loadaverage() < b.second.loadaverage();
});

return host_resources;
}
14 changes: 14 additions & 0 deletions src/loadbalance/MostSlotsPolicy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include <faabric/loadbalance/LoadBalancePolicy.h>
#include <stdexcept>

std::vector<std::pair<std::string, faabric::HostResources>> MostSlotsPolicy::dispatch(std::vector<std::pair<std::string, faabric::HostResources>>& host_resources)
{
// Sort the vector by the number of available slots in descending order
std::sort(host_resources.begin(), host_resources.end(), [](const auto &a, const auto &b) {
int available_a = a.second.slots() - a.second.usedslots();
int available_b = b.second.slots() - b.second.usedslots();
return available_a > available_b;
});

return host_resources;
}
7 changes: 7 additions & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ message BatchExecuteRequest {
message HostResources {
int32 slots = 1;
int32 usedSlots = 2;
double loadAverage = 3;
}

message UnregisterRequest {
Expand All @@ -72,6 +73,12 @@ message FunctionStatusResponse {
FunctionStatus status = 1;
}

message NodeUtilisationResponse {
double cpu_utilisation = 1;
double mem_utilisation = 2;
double load_avg = 3;
}

// ---------------------------------------------
// MPI
// ---------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/redis/Redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ UniqueRedisReply Redis::dequeueBase(const std::string& queueName, int timeoutMs)
// Check if we got anything
if (reply == nullptr || reply->type == REDIS_REPLY_NIL) {
std::string msg =
fmt::format("No response from Redis dequeue in {}ms for queue {}",
fmt::format("d from Redis dequeue in {}ms for queue {}",
timeoutMs,
queueName);
throw RedisNoResponseException(msg);
Expand Down
Loading