From a2f6d87591c81c2e17576e176d8a4e8dbad1a6db Mon Sep 17 00:00:00 2001 From: Teyras Date: Tue, 9 May 2017 12:19:24 +0200 Subject: [PATCH 1/3] update spdlog --- src/broker_core.cpp | 1 - vendor/spdlog | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/broker_core.cpp b/src/broker_core.cpp index f857e5c..ec091d4 100644 --- a/src/broker_core.cpp +++ b/src/broker_core.cpp @@ -101,7 +101,6 @@ void broker_core::log_init() // Create multithreaded rotating file sink. Max filesize is 1024 * 1024 and we save 5 newest files. auto rotating_sink = std::make_shared((path / log_conf.log_basename).string(), - log_conf.log_suffix, log_conf.log_file_size, log_conf.log_files_count); // Set queue size for asynchronous logging. It must be a power of 2. Also, flush every second. diff --git a/vendor/spdlog b/vendor/spdlog index 1f1f6a5..f85a086 160000 --- a/vendor/spdlog +++ b/vendor/spdlog @@ -1 +1 @@ -Subproject commit 1f1f6a5f3b424203a429e9cb78e6548037adefa8 +Subproject commit f85a08622e20b74bff34381cafcb8ef8167b29d0 From 87d21f5338529a5c4074b5f2d7dbc6810938d6ef Mon Sep 17 00:00:00 2001 From: Teyras Date: Thu, 11 May 2017 15:51:26 +0200 Subject: [PATCH 2/3] add a new queue manager implementation --- src/queuing/multi_queue_manager.cpp | 109 ++++++++++++++++++++ src/queuing/multi_queue_manager.h | 28 ++++++ src/queuing/queue_manager_interface.h | 85 ++++++++++++++++ tests/CMakeLists.txt | 8 ++ tests/multi_queue_manager.cpp | 138 ++++++++++++++++++++++++++ 5 files changed, 368 insertions(+) create mode 100644 src/queuing/multi_queue_manager.cpp create mode 100644 src/queuing/multi_queue_manager.h create mode 100644 src/queuing/queue_manager_interface.h create mode 100644 tests/multi_queue_manager.cpp diff --git a/src/queuing/multi_queue_manager.cpp b/src/queuing/multi_queue_manager.cpp new file mode 100644 index 0000000..bfd9dcb --- /dev/null +++ b/src/queuing/multi_queue_manager.cpp @@ -0,0 +1,109 @@ +#include "multi_queue_manager.h" + +request_ptr multi_queue_manager::add_worker(worker_ptr worker, request_ptr current_request) +{ + queues_.emplace(worker, std::queue()); + current_requests_.emplace(worker, current_request); + worker_queue_.push_front(worker); + return nullptr; +} + +std::shared_ptr> multi_queue_manager::worker_terminated(worker_ptr worker) +{ + auto result = std::make_shared>(); + + if (current_requests_[worker] != nullptr) { + result->push_back(current_requests_[worker]); + } + + while (!queues_[worker].empty()) { + result->push_back(queues_[worker].front()); + queues_[worker].pop(); + } + + worker_queue_.remove(worker); + queues_.erase(worker); + current_requests_.erase(worker); + + return result; +} + +enqueue_result multi_queue_manager::enqueue_request (request_ptr request) +{ + enqueue_result result; + result.enqueued = false; + + // Look for a suitable worker + worker_ptr worker = nullptr; + + for (auto it = std::begin(worker_queue_); it != std::end(worker_queue_); it++) { + if ((*it)->check_headers(request->headers)) { + worker = *it; + break; + } + } + + // If a worker was found, enqueue the request + if (worker) { + result.enqueued = true; + + // Move the worker to the end of the queue + worker_queue_.remove(worker); + worker_queue_.push_back(worker); + + if (current_requests_[worker] == nullptr) { + // The worker is free -> assign the request right away + current_requests_[worker] = request; + result.assigned_to = worker; + } else { + // The worker is occupied -> put the request in its queue + queues_[worker].push(request); + } + } + + return result; +} + +request_ptr multi_queue_manager::worker_finished (worker_ptr worker) +{ + current_requests_[worker] = nullptr; + + if (queues_[worker].empty()) { + return nullptr; + } + + request_ptr new_request = queues_[worker].front(); + queues_[worker].pop(); + current_requests_[worker] = new_request; + + return new_request; +} + +request_ptr multi_queue_manager::get_current_request(worker_ptr worker) +{ + return current_requests_[worker]; +} + +request_ptr multi_queue_manager::assign_request(worker_ptr worker) +{ + if (queues_[worker].empty()) { + return nullptr; + } + + request_ptr new_request = queues_[worker].front(); + queues_[worker].pop(); + current_requests_[worker] = new_request; + + return new_request; +} + +request_ptr multi_queue_manager::worker_cancelled(worker_ptr worker) +{ + auto request = current_requests_[worker]; + current_requests_[worker] = nullptr; + return request; +} + +multi_queue_manager::~multi_queue_manager() +{ +} diff --git a/src/queuing/multi_queue_manager.h b/src/queuing/multi_queue_manager.h new file mode 100644 index 0000000..3032fb8 --- /dev/null +++ b/src/queuing/multi_queue_manager.h @@ -0,0 +1,28 @@ +#ifndef RECODEX_BROKER_MULTI_QUEUE_MANAGER_HPP +#define RECODEX_BROKER_MULTI_QUEUE_MANAGER_HPP + +#include +#include "queue_manager_interface.h" + +/** + * Manages a separate request queue for every worker + */ +class multi_queue_manager : public queue_manager_interface +{ +private: + std::map> queues_; + std::map current_requests_; + std::list worker_queue_; +public: + virtual ~multi_queue_manager(); + virtual request_ptr add_worker(worker_ptr worker, request_ptr current_request = nullptr); + virtual request_ptr assign_request(worker_ptr worker); + virtual std::shared_ptr> worker_terminated(worker_ptr); + virtual enqueue_result enqueue_request(request_ptr request); + virtual request_ptr get_current_request(worker_ptr worker); + virtual request_ptr worker_finished(worker_ptr worker); + virtual request_ptr worker_cancelled(worker_ptr worker); +}; + + +#endif //RECODEX_BROKER_MULTI_QUEUE_MANAGER_HPP diff --git a/src/queuing/queue_manager_interface.h b/src/queuing/queue_manager_interface.h new file mode 100644 index 0000000..970807e --- /dev/null +++ b/src/queuing/queue_manager_interface.h @@ -0,0 +1,85 @@ +#ifndef RECODEX_BROKER_QUEUE_MANAGER_INTERFACE_HPP +#define RECODEX_BROKER_QUEUE_MANAGER_INTERFACE_HPP + +#include "../worker.h" +#include "../worker_registry.h" + +typedef worker_registry::worker_ptr worker_ptr; +typedef worker::request_ptr request_ptr; + +/** + * Describes the result of an enqueue operation + */ +struct enqueue_result +{ + /** + * The worker to which the request was assigned (if any) + */ + worker_ptr assigned_to; + + /** + * True if the request was successfully enqueued, false otherwise + */ + bool enqueued; +}; + +class queue_manager_interface +{ +public: + /** + * Register a new worker. This can result into a job being assigned to it right away. + * Every worker used by the queue manager must be registered using this method. + * @param worker the new worker + * @param current_request the request that is currently being processed by the worker + * @return a newly assigned request (nullptr is returned when current_request is specified or when there is no + * request to assign to the worker) + */ + virtual request_ptr add_worker(worker_ptr worker, request_ptr current_request = nullptr) = 0; + + /** + * Assign a queued request to given worker. If this succeeds, the request must be sent to the actual worker + * machine by the caller. + * @param worker + * @return a new request or nullptr if nothing can be assigned to the worker + */ + virtual request_ptr assign_request(worker_ptr worker) = 0; + + /** + * Remove a worker and return the request it was processing along with the requests that cannot be processed + * after its departure. + * This method is called when the worker is considered dead. It should not attempt to reassign any requests. + * @return requests that cannot be completed + */ + virtual std::shared_ptr> worker_terminated(worker_ptr) = 0; + + /** + * Try to enqueue a request. If it is assigned, it must be sent to the actual worker by the caller. + * @param request the request to be enqueued + * @return a structure containing details about the result + */ + virtual enqueue_result enqueue_request(request_ptr request) = 0; + + /** + * Get the request currently being processed by given worker + */ + virtual request_ptr get_current_request(worker_ptr worker) = 0; + + /** + * Mark the current request of a worker as complete and possibly assign it another request. + * Called when the actual worker machine finishes processing the request. + * @param worker the worker that finished its job + * @return a new request assigned to the worker (if any) + */ + virtual request_ptr worker_finished(worker_ptr worker) = 0; + + /** + * Mark the current request of a worker as cancelled and do not assign it another request. + * Called when the worker machine fails to process the request. + * The caller can decide whether the request should be enqueued again or not. + * @param worker the worker whose job was cancelled + * @return the cancelled request + */ + virtual request_ptr worker_cancelled(worker_ptr worker) = 0; +}; + +#endif //RECODEX_BROKER_QUEUE_MANAGER_INTERFACE_HPP diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index db09403..8ae4ab3 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -38,6 +38,13 @@ add_test_suite(worker_registry ${HELPERS_DIR}/string_to_hex.cpp ) +add_test_suite(multi_queue_manager + multi_queue_manager.cpp + ${SRC_DIR}/queuing/multi_queue_manager.cpp + ${SRC_DIR}/worker.cpp + ${HELPERS_DIR}/string_to_hex.cpp +) + add_test_suite(broker mocks.h broker.cpp @@ -57,6 +64,7 @@ add_test_suite(broker ${SRC_DIR}/reactor/router_socket_wrapper.cpp ${SRC_DIR}/reactor/command_holder.cpp ${SRC_DIR}/notifier/reactor_status_notifier.cpp + ${SRC_DIR}/queuing/multi_queue_manager.cpp ) add_test_suite(worker diff --git a/tests/multi_queue_manager.cpp b/tests/multi_queue_manager.cpp new file mode 100644 index 0000000..7119537 --- /dev/null +++ b/tests/multi_queue_manager.cpp @@ -0,0 +1,138 @@ +#include +#include + +#include "../src/worker.h" +#include "../src/queuing/multi_queue_manager.h" + +using namespace testing; + +TEST(multi_queue_manager, add_worker) +{ + std::multimap headers = {}; + job_request_data data("", {}); + + multi_queue_manager manager; + + auto worker_1 = worker_ptr(new worker("identity1", "group_1", headers)); + auto request_1 = std::make_shared(headers, data); + + manager.add_worker(worker_1, request_1); + ASSERT_EQ(request_1, manager.get_current_request(worker_1)); +} + +TEST(multi_queue_manager, basic_queueing) +{ + std::multimap headers = {}; + job_request_data data("", {}); + + multi_queue_manager manager; + + auto worker_1 = worker_ptr(new worker("identity1", "group_1", headers)); + auto request_1 = std::make_shared(headers, data); + auto request_2 = std::make_shared(headers, data); + + manager.add_worker(worker_1); + + enqueue_result result = manager.enqueue_request(request_1); + ASSERT_EQ(worker_1, result.assigned_to); + ASSERT_TRUE(result.enqueued); + ASSERT_EQ(request_1, manager.get_current_request(worker_1)); + + result = manager.enqueue_request(request_2); + ASSERT_TRUE(result.enqueued); + ASSERT_EQ(nullptr, result.assigned_to); + ASSERT_EQ(request_1, manager.get_current_request(worker_1)); + + request_ptr next_request = manager.worker_finished(worker_1); + ASSERT_EQ(next_request, request_2); + ASSERT_EQ(request_2, manager.get_current_request(worker_1)); + + next_request = manager.worker_finished(worker_1); + ASSERT_EQ(nullptr, next_request); + ASSERT_EQ(nullptr, manager.get_current_request(worker_1)); +} + +TEST(multi_queue_manager, terminate_basic) +{ + multi_queue_manager manager; + + std::multimap headers = {}; + job_request_data data("", {}); + + auto worker_1 = worker_ptr(new worker("identity1", "group_1", headers)); + manager.add_worker(worker_1); + auto request_1 = std::make_shared(headers, data); + auto request_2 = std::make_shared(headers, data); + auto request_3 = std::make_shared(headers, data); + + manager.enqueue_request(request_1); + manager.enqueue_request(request_2); + + ASSERT_THAT(manager.worker_terminated(worker_1), Pointee(ElementsAre(request_1, request_2))); + + // The worker terminated -> do not assign it new requests + enqueue_result result = manager.enqueue_request(request_3); + ASSERT_FALSE(result.enqueued); +} + +TEST(multi_queue_manager, terminate_no_current) +{ + multi_queue_manager manager; + + std::multimap headers = {}; + job_request_data data("", {}); + + auto worker_1 = worker_ptr(new worker("identity1", "group_1", headers)); + manager.add_worker(worker_1); + auto request_1 = std::make_shared(headers, data); + auto request_2 = std::make_shared(headers, data); + + manager.enqueue_request(request_1); + manager.enqueue_request(request_2); + manager.worker_cancelled(worker_1); + + ASSERT_THAT(manager.worker_terminated(worker_1), Pointee(ElementsAre(request_2))); +} + +TEST(multi_queue_manager, terminate_empty) +{ + multi_queue_manager manager; + + std::multimap headers = {}; + job_request_data data("", {}); + + auto worker_1 = worker_ptr(new worker("identity1", "group_1", headers)); + manager.add_worker(worker_1); + auto request_1 = std::make_shared(headers, data); + + manager.enqueue_request(request_1); + manager.worker_finished(worker_1); + ASSERT_EQ(nullptr, manager.get_current_request(worker_1)); + + ASSERT_THAT(manager.worker_terminated(worker_1), Pointee(IsEmpty())); +} + +TEST(multi_queue_manager, load_balancing) +{ + multi_queue_manager manager; + + auto worker_1 = worker_ptr(new worker("id1234", "group_1", {{"env", "c"}})); + auto worker_2 = worker_ptr(new worker("id12345", "group_1", {{"env", "c"}})); + + manager.add_worker(worker_1); + manager.add_worker(worker_2); + + request::headers_t headers = {{"env", "c"}}; + job_request_data data("", {}); + auto request_1 = std::make_shared(headers, data); + auto request_2 = std::make_shared(headers, data); + + enqueue_result result_1 = manager.enqueue_request(request_1); + enqueue_result result_2 = manager.enqueue_request(request_2); + + ASSERT_TRUE(result_1.enqueued); + ASSERT_TRUE(result_2.enqueued); + ASSERT_NE(result_1.assigned_to, nullptr); + ASSERT_NE(result_2.assigned_to, nullptr); + ASSERT_NE(result_1.assigned_to, result_2.assigned_to); +} From fcfc40a0c8f50cc4e12a83b523f3473c9ef8692a Mon Sep 17 00:00:00 2001 From: Teyras Date: Thu, 11 May 2017 15:57:13 +0200 Subject: [PATCH 3/3] use the new queue manager implementation --- CMakeLists.txt | 3 + src/broker_connect.cpp | 5 +- src/broker_connect.h | 4 ++ src/broker_core.cpp | 4 +- src/broker_core.h | 3 + src/handlers/broker_handler.cpp | 102 ++++++++++++++++++-------------- src/handlers/broker_handler.h | 16 +++-- src/worker.cpp | 72 ++++------------------ src/worker.h | 47 +-------------- src/worker_registry.cpp | 21 +------ src/worker_registry.h | 7 --- tests/broker.cpp | 101 +++++++++++++++++-------------- tests/worker.cpp | 81 ------------------------- tests/worker_registry.cpp | 20 ------- 14 files changed, 155 insertions(+), 331 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b8300c7..7844ee1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,6 +55,9 @@ set(SOURCE_FILES src/notifier/reactor_status_notifier.h src/broker_connect.cpp src/reactor/command_holder.cpp + src/queuing/queue_manager_interface.h + src/queuing/multi_queue_manager.cpp + src/queuing/multi_queue_manager.h ) add_executable(${EXEC_NAME} ${SOURCE_FILES}) diff --git a/src/broker_connect.cpp b/src/broker_connect.cpp index 0d50dc5..26acd1b 100644 --- a/src/broker_connect.cpp +++ b/src/broker_connect.cpp @@ -13,8 +13,9 @@ const std::string broker_connect::MONITOR_IDENTITY = "recodex-monitor"; broker_connect::broker_connect(std::shared_ptr config, std::shared_ptr context, std::shared_ptr router, + std::shared_ptr queue, std::shared_ptr logger) - : config_(config), logger_(logger), workers_(router), reactor_(context) + : config_(config), logger_(logger), workers_(router), reactor_(context), queue_(queue) { if (logger_ == nullptr) { logger_ = helpers::create_null_logger(); @@ -35,7 +36,7 @@ broker_connect::broker_connect(std::shared_ptr config, reactor_.add_socket(KEY_MONITOR, std::make_shared(context, monitor_endpoint, false)); reactor_.add_handler( - {KEY_CLIENTS, KEY_WORKERS, KEY_TIMER}, std::make_shared(config_, workers_, logger_)); + {KEY_CLIENTS, KEY_WORKERS, KEY_TIMER}, std::make_shared(config_, workers_, queue_, logger_)); reactor_.add_async_handler( {KEY_STATUS_NOTIFIER}, std::make_shared(config_->get_notifier_config(), logger_)); } diff --git a/src/broker_connect.h b/src/broker_connect.h index 69e84ec..c2a1174 100644 --- a/src/broker_connect.h +++ b/src/broker_connect.h @@ -27,6 +27,8 @@ class broker_connect std::shared_ptr logger_; /** Registry of connected and alive workers. */ std::shared_ptr workers_; + /** Queue manager */ + std::shared_ptr queue_; /** A reactor that provides us with an event-based API to communicate with the clients and workers */ reactor reactor_; @@ -53,11 +55,13 @@ class broker_connect * @param config a configuration object used to set up the connections * @param context ZeroMQ context * @param router A registry used to track workers and their jobs + * @param queue A queue manager * @param logger */ broker_connect(std::shared_ptr config, std::shared_ptr context, std::shared_ptr router, + std::shared_ptr queue, std::shared_ptr logger = nullptr); /** diff --git a/src/broker_core.cpp b/src/broker_core.cpp index ec091d4..c9ef290 100644 --- a/src/broker_core.cpp +++ b/src/broker_core.cpp @@ -1,4 +1,5 @@ #include "broker_core.h" +#include "queuing/multi_queue_manager.h" broker_core::broker_core(std::vector args) : args_(args), config_filename_("config.yml"), logger_(nullptr), broker_(nullptr) @@ -128,7 +129,8 @@ void broker_core::broker_init() logger_->info("Initializing broker connection..."); workers_ = std::make_shared(); context_ = std::make_shared(1); - broker_ = std::make_shared(config_, context_, workers_, logger_); + queue_ = std::make_shared(); + broker_ = std::make_shared(config_, context_, workers_, queue_, logger_); logger_->info("Broker connection initialized."); } diff --git a/src/broker_core.h b/src/broker_core.h index 6034b06..02f05bf 100644 --- a/src/broker_core.h +++ b/src/broker_core.h @@ -120,6 +120,9 @@ class broker_core /** Pointer to task router (handles alive worker and routing tasks between them). */ std::shared_ptr workers_; + /** Pointer to queue manager */ + std::shared_ptr queue_; + /** Pointer to ZeroMQ context. */ std::shared_ptr context_; diff --git a/src/handlers/broker_handler.cpp b/src/handlers/broker_handler.cpp index bd71e6d..e63c3b9 100644 --- a/src/handlers/broker_handler.cpp +++ b/src/handlers/broker_handler.cpp @@ -2,10 +2,9 @@ #include "../broker_connect.h" #include "../notifier/reactor_status_notifier.h" -broker_handler::broker_handler(std::shared_ptr config, - std::shared_ptr workers, - std::shared_ptr logger) - : config_(config), workers_(workers), logger_(logger) +broker_handler::broker_handler(std::shared_ptr config, std::shared_ptr workers, + std::shared_ptr queue, std::shared_ptr logger) + : config_(config), workers_(workers), logger_(logger), queue_(queue) { if (logger_ == nullptr) { logger_ = helpers::create_null_logger(); @@ -93,27 +92,27 @@ void broker_handler::process_client_eval( ++it; } - worker_registry::worker_ptr worker = workers_->find_worker(headers); - - if (worker != nullptr) { - logger_->debug(" - incomming job {}", job_id); + // Create a job request object + // Forward remaining messages to the worker without actually understanding them + std::vector additional_data; + for (; it != std::end(message); ++it) { + additional_data.push_back(*it); + } - // Forward remaining messages to the worker without actually understanding them - std::vector additional_data; - for (; it != std::end(message); ++it) { - additional_data.push_back(*it); - } - job_request_data request_data(job_id, additional_data); + job_request_data request_data(job_id, additional_data); + logger_->debug(" - incoming job {}", job_id); - auto eval_request = std::make_shared(headers, request_data); - worker->enqueue_request(eval_request); + auto eval_request = std::make_shared(headers, request_data); + enqueue_result result = queue_->enqueue_request(eval_request); - if (!assign_queued_request(worker, respond)) { - logger_->debug(" - saved to queue for worker '{}'", worker->get_description()); + if (result.enqueued) { + if (result.assigned_to != nullptr) { + send_request(result.assigned_to, eval_request, respond); + } else { + logger_->debug(" - saved to queue"); } respond(message_container(broker_connect::KEY_CLIENTS, identity, {"accept"})); - workers_->deprioritize_worker(worker); } else { respond(message_container(broker_connect::KEY_CLIENTS, identity, {"reject"})); notify_monitor(job_id, "FAILED", respond); @@ -171,6 +170,7 @@ void broker_handler::process_worker_init( // Create a new worker with the basic information auto new_worker = worker_registry::worker_ptr(new worker(identity, hwgroup, headers)); + std::shared_ptr current_request = nullptr; // Load additional information for (; message_it != std::end(message); ++message_it) { @@ -184,14 +184,18 @@ void broker_handler::process_worker_init( if (key == "description") { new_worker->description = value; } else if (key == "current_job") { - auto current_request = worker::request_ptr(new request(job_request_data(value))); - new_worker->enqueue_request(current_request); - new_worker->next_request(); + current_request = worker::request_ptr(new request(job_request_data(value))); } } // Insert the worker into the registry workers_->add_worker(new_worker); + request_ptr request = queue_->add_worker(new_worker, current_request); + + // Give the worker a job if necessary + if (request != nullptr) { + send_request(new_worker, request, respond); + } // Start an idle timer for our new worker worker_timers_.emplace(new_worker, std::chrono::milliseconds(0)); @@ -223,7 +227,7 @@ void broker_handler::process_worker_done( return; } - std::shared_ptr current = worker->get_current_request(); + request_ptr current = queue_->get_current_request(worker); if (message.at(1) != current->data.get_job_id()) { logger_->error("Got 'done' message from worker {} with mismatched job id - {} (message) vs. {} (worker)", @@ -238,9 +242,11 @@ void broker_handler::process_worker_done( if (status == "OK") { // notify frontend that job ended successfully and complete it internally status_notifier.job_done(message.at(1)); - worker->complete_request(); + request_ptr next_request = queue_->worker_finished(worker); - if (!assign_queued_request(worker, respond)) { + if (next_request != nullptr) { + send_request(worker, next_request, respond); + } else { logger_->debug(" - worker {} is now free", worker->get_description()); } } else if (status == "INTERNAL_ERROR") { @@ -251,7 +257,8 @@ void broker_handler::process_worker_done( return; } - auto failed_request = worker->cancel_request(); + auto failed_request = queue_->worker_cancelled(worker); + failed_request->failure_count += 1; if (!failed_request->data.is_complete()) { status_notifier.rejected_job( @@ -259,7 +266,10 @@ void broker_handler::process_worker_done( } else if (check_failure_count(failed_request, status_notifier, respond)) { reassign_request(failed_request, respond); } else { - assign_queued_request(worker, respond); + auto new_request = queue_->assign_request(worker); + if (new_request) { + send_request(worker, new_request, respond); + } } } else if (status == "FAILED") { if (message.size() != 4) { @@ -270,8 +280,13 @@ void broker_handler::process_worker_done( status_notifier.job_failed(message.at(1), message.at(3)); - worker->cancel_request(); - assign_queued_request(worker, respond); + auto failed_request = queue_->worker_cancelled(worker); + failed_request->failure_count += 1; + auto new_request = queue_->assign_request(worker); + + if (new_request) { + send_request(worker, new_request, respond); + } } else { logger_->warn("Received unexpected status code {} from worker {}", status, worker->get_description()); } @@ -334,7 +349,8 @@ void broker_handler::process_timer(const message_container &message, handler_int logger_->info("Worker {} expired", worker->get_description()); workers_->remove_worker(worker); - auto requests = worker->terminate(); + queue_->get_current_request(worker)->failure_count += 1; + auto requests = queue_->worker_terminated(worker); std::vector unassigned_requests; for (auto request : *requests) { @@ -371,33 +387,29 @@ bool broker_handler::reassign_request(worker::request_ptr request, handler_inter { logger_->debug( " - reassigning job {} ({} attempts already failed)", request->data.get_job_id(), request->failure_count); - worker_registry::worker_ptr substitute_worker = workers_->find_worker(request->headers); - if (substitute_worker == nullptr) { + enqueue_result result = queue_->enqueue_request(request); + + if (!result.enqueued) { notify_monitor(request, "FAILED", respond); return false; } - substitute_worker->enqueue_request(request); - if (!assign_queued_request(substitute_worker, respond)) { + if (result.assigned_to != nullptr) { + send_request(result.assigned_to, request, respond); logger_->debug( - " - job {} queued for worker {}", request->data.get_job_id(), substitute_worker->get_description()); + " - job {} queued for worker {}", request->data.get_job_id(), result.assigned_to->get_description()); } return true; } -bool broker_handler::assign_queued_request(worker_registry::worker_ptr worker, handler_interface::response_cb respond) +void broker_handler::send_request(worker_registry::worker_ptr worker, request_ptr request, response_cb respond) { - if (worker->next_request()) { - respond(message_container( - broker_connect::KEY_WORKERS, worker->identity, worker->get_current_request()->data.get())); - logger_->debug( - " - job {} sent to worker {}", worker->get_current_request()->data.get_job_id(), worker->get_description()); - return true; - } - - return false; + respond(message_container( + broker_connect::KEY_WORKERS, worker->identity, request->data.get())); + logger_->debug( + " - job {} sent to worker {}", request->data.get_job_id(), worker->get_description()); } bool broker_handler::check_failure_count(worker::request_ptr request, status_notifier_interface &status_notifier, diff --git a/src/handlers/broker_handler.h b/src/handlers/broker_handler.h index 4a280b6..f60a624 100644 --- a/src/handlers/broker_handler.h +++ b/src/handlers/broker_handler.h @@ -8,6 +8,7 @@ #include "../reactor/command_holder.h" #include "../reactor/handler_interface.h" #include "../worker_registry.h" +#include "../queuing/queue_manager_interface.h" /** * Processes requests from workers and clients and forwards them accordingly. @@ -20,9 +21,8 @@ class broker_handler : public handler_interface * @param workers worker registry (it's acceptable if it already contains some workers) * @param logger an optional logger */ - broker_handler(std::shared_ptr config, - std::shared_ptr workers, - std::shared_ptr logger); + broker_handler(std::shared_ptr config, std::shared_ptr workers, + std::shared_ptr queue, std::shared_ptr logger); void on_request(const message_container &message, response_cb respond); @@ -33,6 +33,9 @@ class broker_handler : public handler_interface /** Worker registry used for keeping track of workers and their jobs */ std::shared_ptr workers_; + /** The queue manager */ + std::shared_ptr queue_; + /** A system logger */ std::shared_ptr logger_; @@ -93,12 +96,13 @@ class broker_handler : public handler_interface bool reassign_request(worker::request_ptr request, response_cb respond); /** - * Give a worker a new job from the queue - * @param worker the worker in need of a new job + * Send a job to a worker + * @param worker the worker in need of a new job (it must be free) + * @param request the request * @param respond a callback to notify the worker about the reassigned job * @return true if a job was sent to the worker, false otherwise */ - bool assign_queued_request(worker_registry::worker_ptr worker, response_cb respond); + void send_request(worker_registry::worker_ptr worker, request_ptr request, response_cb respond); /** * Check if a request can be reassigned one more time and notify the frontend if not. diff --git a/src/worker.cpp b/src/worker.cpp index 6616bf3..64ef1d0 100644 --- a/src/worker.cpp +++ b/src/worker.cpp @@ -88,7 +88,7 @@ class count_matcher : public header_matcher worker::worker( const std::string &id, const std::string &hwgroup, const std::multimap &headers) - : headers_copy_(headers), free_(true), current_request_(nullptr), identity(id), hwgroup(hwgroup) + : headers_copy_(headers), identity(id), hwgroup(hwgroup) { headers_.emplace("hwgroup", std::unique_ptr(new multiple_string_matcher(hwgroup))); @@ -107,66 +107,6 @@ worker::~worker() { } -void worker::enqueue_request(request_ptr request) -{ - request_queue_.push(request); -} - -void worker::complete_request() -{ - free_ = true; - current_request_ = nullptr; -} - -worker::request_ptr worker::cancel_request() -{ - auto request = current_request_; - - if (request != nullptr) { - request->failure_count += 1; - } - - complete_request(); - return request; -} - -bool worker::next_request() -{ - if (free_ && !request_queue_.empty()) { - current_request_ = request_queue_.front(); - request_queue_.pop(); - free_ = false; - - return true; - } - - return false; -} - -std::shared_ptr worker::get_current_request() const -{ - return current_request_; -} - -std::shared_ptr> worker::terminate() -{ - auto result = std::make_shared>(); - - if (current_request_ != nullptr) { - current_request_->failure_count += 1; - result->push_back(current_request_); - } - - current_request_ = nullptr; - - while (!request_queue_.empty()) { - result->push_back(request_queue_.front()); - request_queue_.pop(); - } - - return result; -} - bool worker::check_header(const std::string &header, const std::string &value) { // Find all worker headers with the right name @@ -196,3 +136,13 @@ std::string worker::get_description() const return helpers::string_to_hex(identity) + " (" + description + ")"; } } + +bool worker::check_headers(const std::multimap &headers) { + for (auto &header : headers) { + if (!check_header(header.first, header.second)) { + return false; + } + } + + return true; +} diff --git a/src/worker.h b/src/worker.h index 38e65e7..a93cf81 100644 --- a/src/worker.h +++ b/src/worker.h @@ -157,15 +157,6 @@ class worker /** A copy of the headers used to instantiate the worker (used by comparison) */ const std::multimap headers_copy_; - /** @a false if the worker is processing a request. */ - bool free_; - - /** A queue of requests to be processed by the worker. */ - std::queue request_queue_; - - /** The request that is now being processed by the worker. */ - request_ptr current_request_; - public: /** A unique identifier of the worker. */ const std::string identity; @@ -205,42 +196,10 @@ class worker virtual bool check_header(const std::string &header, const std::string &value); /** - * Insert a request into the workers queue. - * @param request A pointer to the request. - */ - virtual void enqueue_request(request_ptr request); - - /** - * Consider the current request complete. - * Called when the actual worker machine successfully processes the request. - */ - virtual void complete_request(); - - /** - * Consider the current request failed. - * Called when the worker machine fails to process the request. The machine is then considered free. - * @return a pointer to the cancelled request - */ - virtual request_ptr cancel_request(); - - /** - * If possible, take a request from the queue and start processing it. - * @return @a true if and only if the worker started processing a new request. - */ - virtual bool next_request(); - - /** - * Get the request that is now being processed. - * @return Pointer to currently processed request or @a nullptr. - */ - virtual std::shared_ptr get_current_request() const; - - /** - * Forget all requests (currently processed and queued). - * Called when the worker is considered dead. - * @return Current request and all requests from waiting queue. + * Check if the worker satisfies given header set. + * @param headers A key-value set of headers */ - virtual std::shared_ptr> terminate(); + virtual bool check_headers(const std::multimap &headers); /** * Get a textual description of the worker diff --git a/src/worker_registry.cpp b/src/worker_registry.cpp index 3297333..1c62399 100644 --- a/src/worker_registry.cpp +++ b/src/worker_registry.cpp @@ -24,16 +24,7 @@ void worker_registry::remove_worker(worker_ptr worker) worker_registry::worker_ptr worker_registry::find_worker(const request::headers_t &headers) { for (auto &worker : workers_) { - bool is_worker_suitable = true; - - for (auto &header : headers) { - if (!worker->check_header(header.first, header.second)) { - is_worker_suitable = false; - break; - } - } - - if (is_worker_suitable) { + if (worker->check_headers(headers)) { return worker; } } @@ -52,16 +43,6 @@ worker_registry::worker_ptr worker_registry::find_worker_by_identity(const std:: return nullptr; } -void worker_registry::deprioritize_worker(worker_registry::worker_ptr worker) -{ - auto it = std::find(std::begin(workers_), std::end(workers_), worker); - - if (it != std::end(workers_) && (it + 1) != std::end(workers_)) { - workers_.erase(it); - workers_.push_back(worker); - } -} - const std::vector &worker_registry::get_workers() const { return workers_; diff --git a/src/worker_registry.h b/src/worker_registry.h index ee3111f..ca4a1b6 100644 --- a/src/worker_registry.h +++ b/src/worker_registry.h @@ -46,13 +46,6 @@ class worker_registry * @return Instance of worker with given ID or @a nullptr. */ virtual worker_ptr find_worker_by_identity(const std::string &identity); - /** - * Reduce the priority of a worker so that it's less likely to be found by subsequent finds. - * Now it means change order in worker queue to the last position, so most of the finds will - * succeed earlier. - * @param worker Instance of worker whose priority will be lowered. - */ - virtual void deprioritize_worker(worker_ptr worker); /** * Get all workers known to this service. * @return Collection of all known workers. diff --git a/tests/broker.cpp b/tests/broker.cpp index 76759bd..5812fb5 100644 --- a/tests/broker.cpp +++ b/tests/broker.cpp @@ -3,6 +3,8 @@ #include #include "mocks.h" +#include "../src/queuing/queue_manager_interface.h" +#include "../src/queuing/multi_queue_manager.h" using namespace testing; @@ -30,13 +32,14 @@ TEST(broker, worker_init) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // Run the tested method - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); handler.on_request( message_container(broker_connect::KEY_WORKERS, "identity1", {"init", "group_1", "env=c", "threads=8"}), @@ -62,13 +65,14 @@ TEST(broker, worker_init_additional_info) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // Run the tested method - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); handler.on_request(message_container(broker_connect::KEY_WORKERS, "identity1", @@ -89,9 +93,9 @@ TEST(broker, worker_init_additional_info) // Check the additional information ASSERT_EQ("MyWorker", worker_1->description); - ASSERT_NE(nullptr, worker_1->get_current_request()); - ASSERT_EQ("job_42", worker_1->get_current_request()->data.get_job_id()); - ASSERT_FALSE(worker_1->get_current_request()->data.is_complete()); + ASSERT_NE(nullptr, queue->get_current_request(worker_1)); + ASSERT_EQ("job_42", queue->get_current_request(worker_1)->data.get_job_id()); + ASSERT_FALSE(queue->get_current_request(worker_1)->data.is_complete()); // No responses should be generated ASSERT_TRUE(messages.empty()); @@ -101,6 +105,7 @@ TEST(broker, worker_repeated_init_same_headers) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There is already a worker in the registry workers->add_worker(std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}})); @@ -110,7 +115,7 @@ TEST(broker, worker_repeated_init_same_headers) handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // Run the tested method - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); handler.on_request( message_container(broker_connect::KEY_WORKERS, "identity_1", {"init", "group_1", "env=c"}), respond); @@ -134,17 +139,19 @@ TEST(broker, queuing) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There is already a worker in the registry auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); workers->add_worker(worker_1); + queue->add_worker(worker_1); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); std::string client_id = "client_foo"; @@ -191,13 +198,14 @@ TEST(broker, ping_unknown_worker) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // A worker pings us handler.on_request(message_container(broker_connect::KEY_WORKERS, "identity_1", {"ping"}), respond); @@ -214,6 +222,7 @@ TEST(broker, ping_known_worker) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There is already a worker in the registry auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); @@ -224,7 +233,7 @@ TEST(broker, ping_known_worker) handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // A worker pings us handler.on_request(message_container(broker_connect::KEY_WORKERS, worker_1->identity, {"ping"}), respond); @@ -239,21 +248,21 @@ TEST(broker, worker_expiration) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There is already a worker in the registry and it has a job auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); - auto request_1 = std::make_shared(request::headers_t{{"env", "c"}}, job_request_data("job_id", {})); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); - ASSERT_TRUE(worker_1->next_request()); + auto request_1 = std::make_shared(request::headers_t{{"env", "c"}}, job_request_data("job_id", {})); workers->add_worker(worker_1); + queue->add_worker(worker_1, request_1); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // Looks like our worker timed out and there's nobody to take its work handler.on_request(message_container(broker_connect::KEY_TIMER, "", {"1100"}), respond); @@ -283,21 +292,21 @@ TEST(broker, worker_state_message) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There is already a worker in the registry and it has a job auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); auto request_1 = std::make_shared(request::headers_t{{"env", "c"}}, job_request_data("job_id", {})); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); - ASSERT_TRUE(worker_1->next_request()); workers->add_worker(worker_1); + queue->add_worker(worker_1, request_1); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // We got a progress message from our worker handler.on_request( @@ -312,21 +321,21 @@ TEST(broker, worker_job_failed) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There is already a worker in the registry and it has a job auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); auto request_1 = std::make_shared(request::headers_t{{"env", "c"}}, job_request_data("job_id", {})); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); - ASSERT_TRUE(worker_1->next_request()); workers->add_worker(worker_1); + queue->add_worker(worker_1, request_1); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // We got a message from our worker that says evaluation failed handler.on_request( @@ -347,23 +356,24 @@ TEST(broker, worker_job_failed_queueing) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There is already a worker in the registry and it has two jobs auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); auto request_1 = std::make_shared(request::headers_t{{"env", "c"}}, job_request_data("job_id_1", {})); auto request_2 = std::make_shared(request::headers_t{{"env", "c"}}, job_request_data("job_id_2", {})); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); - worker_1->enqueue_request(request_2); - ASSERT_TRUE(worker_1->next_request()); workers->add_worker(worker_1); + queue->add_worker(worker_1); + queue->enqueue_request(request_1); + queue->enqueue_request(request_2); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // We got a message from our worker that says evaluation failed handler.on_request(message_container(broker_connect::KEY_WORKERS, @@ -395,21 +405,21 @@ TEST(broker, worker_job_done) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There is already a worker in the registry and it has a job auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); auto request_1 = std::make_shared(request::headers_t{{"env", "c"}}, job_request_data("job_id", {})); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); - ASSERT_TRUE(worker_1->next_request()); workers->add_worker(worker_1); + queue->add_worker(worker_1, request_1); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // We got a message from our worker that says evaluation is done handler.on_request( @@ -427,21 +437,21 @@ TEST(broker, worker_orphan_job_done) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There is already a worker in the registry and it has a job whose headers don't know auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); auto request_1 = std::make_shared(job_request_data("job_id")); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); - ASSERT_TRUE(worker_1->next_request()); workers->add_worker(worker_1); + queue->add_worker(worker_1, request_1); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // We got a message from our worker that says evaluation is done handler.on_request( @@ -459,6 +469,7 @@ TEST(broker, worker_job_internal_failure) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); EXPECT_CALL(*config, get_max_request_failures()).WillRepeatedly(Return(10)); @@ -466,16 +477,15 @@ TEST(broker, worker_job_internal_failure) auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); auto request_1 = std::make_shared(request::headers_t{{"env", "c"}}, job_request_data("job_id", {})); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); - ASSERT_TRUE(worker_1->next_request()); workers->add_worker(worker_1); + queue->add_worker(worker_1, request_1); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // We got a message from our worker that says evaluation failed handler.on_request( @@ -498,6 +508,7 @@ TEST(broker, worker_orphan_job_internal_failure) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); EXPECT_CALL(*config, get_max_request_failures()).WillRepeatedly(Return(10)); @@ -505,16 +516,15 @@ TEST(broker, worker_orphan_job_internal_failure) auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); auto request_1 = std::make_shared(job_request_data("job_id")); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); - ASSERT_TRUE(worker_1->next_request()); workers->add_worker(worker_1); + queue->add_worker(worker_1, request_1); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // We got a message from our worker that says evaluation failed handler.on_request( @@ -543,6 +553,7 @@ TEST(broker, worker_expiration_reassign_job) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There are two workers in the registry, one of them has a job and will die auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); @@ -550,18 +561,18 @@ TEST(broker, worker_expiration_reassign_job) std::make_shared(request::headers_t{{"env", "c"}}, job_request_data("job_id", {"whatever"})); auto worker_2 = std::make_shared("identity_2", "group_1", worker_headers_t{{"env", "c"}}); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); worker_2->liveness = 100; - ASSERT_TRUE(worker_1->next_request()); workers->add_worker(worker_1); workers->add_worker(worker_2); + queue->add_worker(worker_1, request_1); + queue->add_worker(worker_2); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); handler.on_request(message_container(broker_connect::KEY_TIMER, "", {"1100"}), respond); @@ -582,24 +593,25 @@ TEST(broker, worker_expiration_dont_reassign_orphan_job) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); // There are two workers in the registry, one of them has an orphan job and will die auto worker_1 = std::make_shared("identity_1", "group_1", worker_headers_t{{"env", "c"}}); auto request_1 = std::make_shared(job_request_data("job_id")); auto worker_2 = std::make_shared("identity_2", "group_1", worker_headers_t{{"env", "c"}}); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); worker_2->liveness = 100; - ASSERT_TRUE(worker_1->next_request()); workers->add_worker(worker_1); workers->add_worker(worker_2); + queue->add_worker(worker_1, request_1); + queue->add_worker(worker_2); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); // Looks like our worker timed out - the other one cannot get its job because we don't know the job's headers. // We'll report it as failed instead. We must also notify the monitor. @@ -627,6 +639,7 @@ TEST(broker, worker_expiration_cancel_job) { auto config = std::make_shared>(); auto workers = std::make_shared(); + auto queue = std::make_shared(); EXPECT_CALL(*config, get_max_request_failures()).WillRepeatedly(Return(1)); @@ -637,18 +650,18 @@ TEST(broker, worker_expiration_cancel_job) request_1->failure_count = 1; auto worker_2 = std::make_shared("identity_2", "group_1", worker_headers_t{{"env", "c"}}); worker_1->liveness = 1; - worker_1->enqueue_request(request_1); worker_2->liveness = 100; - ASSERT_TRUE(worker_1->next_request()); workers->add_worker(worker_1); workers->add_worker(worker_2); + queue->add_worker(worker_1, request_1); + queue->add_worker(worker_2); // Dummy response callback std::vector messages; handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); }; // The test code - broker_handler handler(config, workers, nullptr); + broker_handler handler(config, workers, queue, nullptr); handler.on_request(message_container(broker_connect::KEY_TIMER, "", {"1100"}), respond); diff --git a/tests/worker.cpp b/tests/worker.cpp index 021198d..2914871 100644 --- a/tests/worker.cpp +++ b/tests/worker.cpp @@ -5,87 +5,6 @@ using namespace testing; -TEST(worker, basic_queueing) -{ - std::multimap headers = {}; - job_request_data data("", {}); - - worker worker_1("identity1", "group_1", headers); - auto request_1 = std::make_shared(headers, data); - auto request_2 = std::make_shared(headers, data); - - worker_1.enqueue_request(request_1); - ASSERT_EQ(nullptr, worker_1.get_current_request()); - - ASSERT_TRUE(worker_1.next_request()); - ASSERT_EQ(request_1, worker_1.get_current_request()); - ASSERT_FALSE(worker_1.next_request()); - - worker_1.enqueue_request(request_2); - ASSERT_EQ(request_1, worker_1.get_current_request()); - ASSERT_FALSE(worker_1.next_request()); - - worker_1.complete_request(); - ASSERT_EQ(nullptr, worker_1.get_current_request()); - - ASSERT_TRUE(worker_1.next_request()); - ASSERT_EQ(request_2, worker_1.get_current_request()); - - ASSERT_FALSE(worker_1.next_request()); - - worker_1.complete_request(); - ASSERT_EQ(nullptr, worker_1.get_current_request()); - ASSERT_FALSE(worker_1.next_request()); -} - -TEST(worker, terminate_basic) -{ - std::multimap headers = {}; - job_request_data data("", {}); - - worker worker_1("identity1", "group_1", headers); - auto request_1 = std::make_shared(headers, data); - auto request_2 = std::make_shared(headers, data); - - worker_1.enqueue_request(request_1); - worker_1.enqueue_request(request_2); - ASSERT_TRUE(worker_1.next_request()); - - ASSERT_THAT(worker_1.terminate(), Pointee(ElementsAre(request_1, request_2))); -} - -TEST(worker, terminate_no_current) -{ - std::multimap headers = {}; - job_request_data data("", {}); - - worker worker_1("identity1", "group_1", headers); - auto request_1 = std::make_shared(headers, data); - auto request_2 = std::make_shared(headers, data); - - worker_1.enqueue_request(request_1); - worker_1.enqueue_request(request_2); - - ASSERT_THAT(worker_1.terminate(), Pointee(ElementsAre(request_1, request_2))); -} - -TEST(worker, terminate_empty) -{ - std::multimap headers = {}; - job_request_data data("", {}); - - worker worker_1("identity1", "group_1", headers); - auto request_1 = std::make_shared(headers, data); - - worker_1.enqueue_request(request_1); - ASSERT_TRUE(worker_1.next_request()); - worker_1.complete_request(); - ASSERT_EQ(nullptr, worker_1.get_current_request()); - ASSERT_FALSE(worker_1.next_request()); - - ASSERT_THAT(worker_1.terminate(), Pointee(IsEmpty())); -} - TEST(worker, headers_basic) { std::multimap headers = {{"env", "c"}, {"threads", "8"}}; diff --git a/tests/worker_registry.cpp b/tests/worker_registry.cpp index fd20b34..2a36d7c 100644 --- a/tests/worker_registry.cpp +++ b/tests/worker_registry.cpp @@ -27,23 +27,3 @@ TEST(worker_registry, basic_lookup) ASSERT_EQ(worker2, workers.find_worker(headers2)); ASSERT_EQ(nullptr, workers.find_worker(headers3)); } - -TEST(worker_registry, load_balancing) -{ - worker_registry workers; - - auto worker1 = worker_registry::worker_ptr(new worker("id1234", "group_1", {{"env", "c"}})); - - auto worker2 = worker_registry::worker_ptr(new worker("id12345", "group_1", {{"env", "c"}})); - - workers.add_worker(worker1); - workers.add_worker(worker2); - - request::headers_t headers = {{"env", "c"}}; - - auto first_found = workers.find_worker(headers); - workers.deprioritize_worker(first_found); - auto second_found = workers.find_worker(headers); - - ASSERT_NE(first_found, second_found); -} \ No newline at end of file