Skip to content

Commit

Permalink
Merge pull request #12 from ReCodEx/single-queue
Browse files Browse the repository at this point in the history
Single queue
  • Loading branch information
krulis-martin committed Nov 11, 2022
2 parents 1faa3b1 + 31fe7fa commit f9710b5
Show file tree
Hide file tree
Showing 12 changed files with 422 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ build/
.idea/*.iml
*.user

/.vscode
5 changes: 3 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 2.8.11)
project(recodex-broker)
set(RECODEX_VERSION "1.3.1")
set(RECODEX_VERSION "1.4.0")

set(EXEC_NAME ${PROJECT_NAME})
enable_testing()
Expand All @@ -15,7 +15,7 @@ include_directories(AFTER, vendor/spdlog/include)
find_package(CURL REQUIRED)
include_directories(${CURL_INCLUDE_DIRS})

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall")

set(SOURCE_FILES
src/main.cpp
Expand Down Expand Up @@ -59,6 +59,7 @@ set(SOURCE_FILES
src/queuing/queue_manager_interface.h
src/queuing/multi_queue_manager.cpp
src/queuing/multi_queue_manager.h
src/queuing/single_queue_manager.h
)

add_executable(${EXEC_NAME} ${SOURCE_FILES})
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ The default location for broker configuration file is
`err`, `warn`, `notice`, `info` and `debug`
- _max-size_ -- maximal size of log file before rotating
- _rotations_ -- number of rotation kept
- _queue_manager_ -- selection of the queue manager implementation responsible for assigning jobs to workers. Currently only `single` (the default) and `multi` queue managers are in production version. Single-queue manager has one queue and dispatches jobs on demand as workers become available. Multi-queue manager has a queue for every worker, jobs are assigned immediately and cannot be re-assigned unless failure occurs. I.e., `single` provides better load balancing, `multi` has lower dispatching overhead.

### Example config file

Expand Down Expand Up @@ -172,6 +173,7 @@ logger:
level: "debug" # level of logging
max-size: 1048576 # 1 MB; max size of file before log rotation
rotations: 3 # number of rotations kept
queue_manager: "single" # name of the manager that handles job dispatching among queues (single is the default)
```

## Documentation
Expand Down
1 change: 1 addition & 0 deletions examples/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ logger:
level: "debug" # level of logging
max-size: 1048576 # 1 MB; max size of file before log rotation
rotations: 3 # number of rotations kept
queue_manager: "single" # name of the manager that handles job dispatching among queues (single is the default)
2 changes: 1 addition & 1 deletion recodex-broker.spec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%define short_name broker
%define version 1.3.2
%define unmangled_version c4d2c6135d0e94bf50140c5a68eb45b421dfbd64
%define release 1
%define release 2

%define spdlog_name spdlog
%define spdlog_version 0.13.0
Expand Down
13 changes: 12 additions & 1 deletion src/broker_core.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "broker_core.h"
#include "queuing/single_queue_manager.h"
#include "queuing/multi_queue_manager.h"

broker_core::broker_core(std::vector<std::string> args)
Expand Down Expand Up @@ -127,7 +128,17 @@ void broker_core::broker_init()
logger_->info("Initializing broker connection...");
workers_ = std::make_shared<worker_registry>();
context_ = std::make_shared<zmq::context_t>(1);
queue_ = std::make_shared<multi_queue_manager>();

// Yes, this may be done better, but it will do for now.
auto queue_manager_id = config_->get_queue_manager();
if (queue_manager_id == "multi") {
queue_ = std::make_shared<multi_queue_manager>();
} else if (queue_manager_id == "single") {
queue_ = std::make_shared<single_queue_manager<>>();
} else {
force_exit("Unknown queue manager '" + queue_manager_id + "'. Available managers are 'single' and 'multi'.");
}

broker_ = std::make_shared<broker_connect>(config_, context_, workers_, queue_, logger_);
logger_->info("Broker connection initialized.");
}
Expand Down
9 changes: 9 additions & 0 deletions src/config/broker_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ broker_config::broker_config(const YAML::Node &config)
throw config_error("The configuration is not a YAML map");
}

if (config["queue_manager"] && config["queue_manager"].IsScalar()) {
queue_manager_ = config["queue_manager"].as<std::string>();
}

// load client address and port
if (config["clients"] && config["clients"].IsMap()) {
if (config["clients"]["address"] && config["clients"]["address"].IsScalar()) {
Expand Down Expand Up @@ -85,6 +89,11 @@ broker_config::broker_config(const YAML::Node &config)
}
}

const std::string &broker_config::get_queue_manager() const
{
return queue_manager_;
}

const std::string &broker_config::get_client_address() const
{
return client_address_;
Expand Down
7 changes: 7 additions & 0 deletions src/config/broker_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class broker_config
* Destructor
*/
virtual ~broker_config() = default;
/**
* Get the identifier of the queue manager.
* @return String identifier which should be translated into known implementation of queue_manager_interface.
*/
virtual const std::string &get_queue_manager() const;
/**
* Get IP address for client connections (from frontend).
* @return Broker's IP address for client connections.
Expand Down Expand Up @@ -90,6 +95,8 @@ class broker_config
const notifier_config &get_notifier_config() const;

private:
/** Identifier of the queue manager being used for job dispatching */
std::string queue_manager_ = "single";
/** Client socket address (from frontend) */
std::string client_address_ = "*"; // '*' is any address
/** Server socket address (to workers) */
Expand Down
2 changes: 1 addition & 1 deletion src/queuing/queue_manager_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class queue_manager_interface
virtual request_ptr worker_finished(worker_ptr worker) = 0;

/**
* Mark the current request of a worker as cancelled and do not assign it another request.
* Mark the current request of a worker as cancelled and do not (re)assign it to another worker.
* Called when the worker machine fails to process the request.
* The caller can decide whether the request should be enqueued again or not.
* @param worker the worker whose job was cancelled
Expand Down
189 changes: 189 additions & 0 deletions src/queuing/single_queue_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#ifndef RECODEX_BROKER_SINGLE_QUEUE_MANAGER_HPP
#define RECODEX_BROKER_SINGLE_QUEUE_MANAGER_HPP

#include "queue_manager_interface.h"

#include <chrono>
#include <memory>
#include <vector>
#include <map>
#include <algorithm>

struct request_entry {
request_ptr request;
std::chrono::milliseconds arrived_at;
};

struct fcfs_job_comparator {
bool compare(const request_entry &a, const request_entry &b, worker_ptr worker) const
{
return a.arrived_at < b.arrived_at;
}
};


struct first_idle_worker_selector {
worker_ptr select(const std::map<worker_ptr, request_ptr> &worker_jobs, const std::vector<request_entry> &queued_jobs, request_ptr request) const
{
for (auto &pair: worker_jobs) {
if (pair.second == nullptr && pair.first->check_headers(request->headers)) {
return pair.first;
}
}

return nullptr;
}
};

template <typename JobComparator = fcfs_job_comparator, typename IdleWorkerSelector = first_idle_worker_selector>
class single_queue_manager : public queue_manager_interface
{
private:
std::unique_ptr<JobComparator> comparator_;
std::unique_ptr<IdleWorkerSelector> selector_;
std::vector<request_entry> jobs_;
std::map<worker_ptr, request_ptr> worker_jobs_;
std::vector<worker_ptr> workers_;

/**
* Check whether a worker exists capable of processing given request (according to headers)
* @param request_ptr request to be tested
* @return true if some worker exists, false if the request cannot be accomodated with current workers
*/
bool is_request_assignable(request_ptr request) const
{
for (auto &worker : workers_) {
if (worker->check_headers(request->headers)) {
return true;
}
}
return false;
}

public:
explicit single_queue_manager():
comparator_(std::make_unique<JobComparator>()), selector_(std::make_unique<IdleWorkerSelector>())
{}

explicit single_queue_manager(std::unique_ptr<JobComparator> comparator):
comparator_(std::move(comparator)), selector_(std::make_unique<IdleWorkerSelector>())
{}

single_queue_manager(std::unique_ptr<JobComparator> comparator, std::unique_ptr<IdleWorkerSelector> selector):
comparator_(std::move(comparator)), selector_(std::move(selector))
{}

~single_queue_manager() override = default;

request_ptr add_worker(worker_ptr worker, request_ptr current_request = nullptr) override
{
worker_jobs_[worker] = current_request;
workers_.push_back(worker);

if (current_request != nullptr) {
return current_request;
}

return assign_request(worker);
}

request_ptr assign_request(worker_ptr worker) override
{
worker_jobs_[worker] = nullptr;
std::sort(jobs_.begin(), jobs_.end(), [this, worker] (const request_entry &a, const request_entry &b) {
return comparator_->compare(a, b, worker);
});

for (auto it = jobs_.cbegin(); it != jobs_.cend(); ++it) {
if (!worker->check_headers(it->request->headers)) {
continue;
}

worker_jobs_[worker] = it->request;
jobs_.erase(it);

return worker_jobs_[worker];
}

return nullptr;
}

std::shared_ptr<std::vector<request_ptr>> worker_terminated(worker_ptr worker) override
{
auto result = std::make_shared<std::vector<request_ptr>>();
if (worker_jobs_[worker] != nullptr) {
result->push_back(worker_jobs_[worker]); // currently running job (returned for possible reasignment)
}
worker_jobs_.erase(worker);
workers_.erase(std::remove(workers_.begin(), workers_.end(), worker), workers_.end());

// filter jobs and remove those wich are no longer process-able (after worker removal)
for (auto &&job : jobs_) {
if (!is_request_assignable(job.request)) {
// the job cannot be accomodated anymore...
result->push_back(job.request);
job.request = nullptr; // mark the job for removal
}
}
jobs_.erase( // remove marked jobs
std::remove_if(jobs_.begin(), jobs_.end(), [](auto &job) { return job.request == nullptr; }),
jobs_.end()
);

return result; // return removed requests
}

enqueue_result enqueue_request(request_ptr request) override
{
// Try to find an idle worker and assign the job
auto idle_worker = selector_->select(worker_jobs_, jobs_, request);
if (idle_worker) {
worker_jobs_[idle_worker] = request;

return enqueue_result{
.assigned_to = idle_worker,
.enqueued = true,
};
}

bool assignable = is_request_assignable(request);
if (assignable) {
// Enqueue the job
jobs_.push_back(request_entry{
.request = request,
.arrived_at = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()
),
});
}

return enqueue_result{
.assigned_to = nullptr,
.enqueued = assignable,
};
}

std::size_t get_queued_request_count() override
{
return jobs_.size();
}

request_ptr get_current_request(worker_ptr worker) override
{
return worker_jobs_[worker];
}

request_ptr worker_finished(worker_ptr worker) override
{
return assign_request(worker);
}

request_ptr worker_cancelled(worker_ptr worker) override
{
auto current_request = worker_jobs_[worker];
worker_jobs_[worker] = nullptr;
return current_request;
}
};

#endif
6 changes: 6 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ add_test_suite(multi_queue_manager
${HELPERS_DIR}/string_to_hex.cpp
)

add_test_suite(single_queue_manager
single_queue_manager.cpp
${SRC_DIR}/worker.cpp
${HELPERS_DIR}/string_to_hex.cpp
)

add_test_suite(broker
mocks.h
broker.cpp
Expand Down
Loading

0 comments on commit f9710b5

Please sign in to comment.