Skip to content

Commit

Permalink
Merge pull request #1 from ReCodEx/queue-refactoring
Browse files Browse the repository at this point in the history
Queue refactoring
  • Loading branch information
janbuchar committed May 12, 2017
2 parents 4fea591 + fcfc40a commit 9e10b9e
Show file tree
Hide file tree
Showing 20 changed files with 524 additions and 333 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ set(SOURCE_FILES
src/notifier/reactor_status_notifier.h
src/broker_connect.cpp
src/reactor/command_holder.cpp
src/queuing/queue_manager_interface.h
src/queuing/multi_queue_manager.cpp
src/queuing/multi_queue_manager.h
)

add_executable(${EXEC_NAME} ${SOURCE_FILES})
Expand Down
5 changes: 3 additions & 2 deletions src/broker_connect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ const std::string broker_connect::MONITOR_IDENTITY = "recodex-monitor";
broker_connect::broker_connect(std::shared_ptr<const broker_config> config,
std::shared_ptr<zmq::context_t> context,
std::shared_ptr<worker_registry> router,
std::shared_ptr<queue_manager_interface> queue,
std::shared_ptr<spdlog::logger> logger)
: config_(config), logger_(logger), workers_(router), reactor_(context)
: config_(config), logger_(logger), workers_(router), reactor_(context), queue_(queue)
{
if (logger_ == nullptr) {
logger_ = helpers::create_null_logger();
Expand All @@ -35,7 +36,7 @@ broker_connect::broker_connect(std::shared_ptr<const broker_config> config,
reactor_.add_socket(KEY_MONITOR, std::make_shared<router_socket_wrapper>(context, monitor_endpoint, false));

reactor_.add_handler(
{KEY_CLIENTS, KEY_WORKERS, KEY_TIMER}, std::make_shared<broker_handler>(config_, workers_, logger_));
{KEY_CLIENTS, KEY_WORKERS, KEY_TIMER}, std::make_shared<broker_handler>(config_, workers_, queue_, logger_));
reactor_.add_async_handler(
{KEY_STATUS_NOTIFIER}, std::make_shared<status_notifier_handler>(config_->get_notifier_config(), logger_));
}
Expand Down
4 changes: 4 additions & 0 deletions src/broker_connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class broker_connect
std::shared_ptr<spdlog::logger> logger_;
/** Registry of connected and alive workers. */
std::shared_ptr<worker_registry> workers_;
/** Queue manager */
std::shared_ptr<queue_manager_interface> queue_;
/** A reactor that provides us with an event-based API to communicate with the clients and workers */
reactor reactor_;

Expand All @@ -53,11 +55,13 @@ class broker_connect
* @param config a configuration object used to set up the connections
* @param context ZeroMQ context
* @param router A registry used to track workers and their jobs
* @param queue A queue manager
* @param logger
*/
broker_connect(std::shared_ptr<const broker_config> config,
std::shared_ptr<zmq::context_t> context,
std::shared_ptr<worker_registry> router,
std::shared_ptr<queue_manager_interface> queue,
std::shared_ptr<spdlog::logger> logger = nullptr);

/**
Expand Down
5 changes: 3 additions & 2 deletions src/broker_core.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "broker_core.h"
#include "queuing/multi_queue_manager.h"

broker_core::broker_core(std::vector<std::string> args)
: args_(args), config_filename_("config.yml"), logger_(nullptr), broker_(nullptr)
Expand Down Expand Up @@ -101,7 +102,6 @@ void broker_core::log_init()
// Create multithreaded rotating file sink. Max filesize is 1024 * 1024 and we save 5 newest files.
auto rotating_sink =
std::make_shared<spdlog::sinks::rotating_file_sink_mt>((path / log_conf.log_basename).string(),
log_conf.log_suffix,
log_conf.log_file_size,
log_conf.log_files_count);
// Set queue size for asynchronous logging. It must be a power of 2. Also, flush every second.
Expand Down Expand Up @@ -129,7 +129,8 @@ void broker_core::broker_init()
logger_->info("Initializing broker connection...");
workers_ = std::make_shared<worker_registry>();
context_ = std::make_shared<zmq::context_t>(1);
broker_ = std::make_shared<broker_connect>(config_, context_, workers_, logger_);
queue_ = std::make_shared<multi_queue_manager>();
broker_ = std::make_shared<broker_connect>(config_, context_, workers_, queue_, logger_);
logger_->info("Broker connection initialized.");
}

Expand Down
3 changes: 3 additions & 0 deletions src/broker_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ class broker_core
/** Pointer to task router (handles alive worker and routing tasks between them). */
std::shared_ptr<worker_registry> workers_;

/** Pointer to queue manager */
std::shared_ptr<queue_manager_interface> queue_;

/** Pointer to ZeroMQ context. */
std::shared_ptr<zmq::context_t> context_;

Expand Down
102 changes: 57 additions & 45 deletions src/handlers/broker_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
#include "../broker_connect.h"
#include "../notifier/reactor_status_notifier.h"

broker_handler::broker_handler(std::shared_ptr<const broker_config> config,
std::shared_ptr<worker_registry> workers,
std::shared_ptr<spdlog::logger> logger)
: config_(config), workers_(workers), logger_(logger)
broker_handler::broker_handler(std::shared_ptr<const broker_config> config, std::shared_ptr<worker_registry> workers,
std::shared_ptr<queue_manager_interface> queue, std::shared_ptr<spdlog::logger> logger)
: config_(config), workers_(workers), logger_(logger), queue_(queue)
{
if (logger_ == nullptr) {
logger_ = helpers::create_null_logger();
Expand Down Expand Up @@ -93,27 +92,27 @@ void broker_handler::process_client_eval(
++it;
}

worker_registry::worker_ptr worker = workers_->find_worker(headers);

if (worker != nullptr) {
logger_->debug(" - incomming job {}", job_id);
// Create a job request object
// Forward remaining messages to the worker without actually understanding them
std::vector<std::string> additional_data;
for (; it != std::end(message); ++it) {
additional_data.push_back(*it);
}

// Forward remaining messages to the worker without actually understanding them
std::vector<std::string> additional_data;
for (; it != std::end(message); ++it) {
additional_data.push_back(*it);
}
job_request_data request_data(job_id, additional_data);
job_request_data request_data(job_id, additional_data);
logger_->debug(" - incoming job {}", job_id);

auto eval_request = std::make_shared<request>(headers, request_data);
worker->enqueue_request(eval_request);
auto eval_request = std::make_shared<request>(headers, request_data);
enqueue_result result = queue_->enqueue_request(eval_request);

if (!assign_queued_request(worker, respond)) {
logger_->debug(" - saved to queue for worker '{}'", worker->get_description());
if (result.enqueued) {
if (result.assigned_to != nullptr) {
send_request(result.assigned_to, eval_request, respond);
} else {
logger_->debug(" - saved to queue");
}

respond(message_container(broker_connect::KEY_CLIENTS, identity, {"accept"}));
workers_->deprioritize_worker(worker);
} else {
respond(message_container(broker_connect::KEY_CLIENTS, identity, {"reject"}));
notify_monitor(job_id, "FAILED", respond);
Expand Down Expand Up @@ -171,6 +170,7 @@ void broker_handler::process_worker_init(

// Create a new worker with the basic information
auto new_worker = worker_registry::worker_ptr(new worker(identity, hwgroup, headers));
std::shared_ptr<request> current_request = nullptr;

// Load additional information
for (; message_it != std::end(message); ++message_it) {
Expand All @@ -184,14 +184,18 @@ void broker_handler::process_worker_init(
if (key == "description") {
new_worker->description = value;
} else if (key == "current_job") {
auto current_request = worker::request_ptr(new request(job_request_data(value)));
new_worker->enqueue_request(current_request);
new_worker->next_request();
current_request = worker::request_ptr(new request(job_request_data(value)));
}
}

// Insert the worker into the registry
workers_->add_worker(new_worker);
request_ptr request = queue_->add_worker(new_worker, current_request);

// Give the worker a job if necessary
if (request != nullptr) {
send_request(new_worker, request, respond);
}

// Start an idle timer for our new worker
worker_timers_.emplace(new_worker, std::chrono::milliseconds(0));
Expand Down Expand Up @@ -223,7 +227,7 @@ void broker_handler::process_worker_done(
return;
}

std::shared_ptr<const request> current = worker->get_current_request();
request_ptr current = queue_->get_current_request(worker);

if (message.at(1) != current->data.get_job_id()) {
logger_->error("Got 'done' message from worker {} with mismatched job id - {} (message) vs. {} (worker)",
Expand All @@ -238,9 +242,11 @@ void broker_handler::process_worker_done(
if (status == "OK") {
// notify frontend that job ended successfully and complete it internally
status_notifier.job_done(message.at(1));
worker->complete_request();
request_ptr next_request = queue_->worker_finished(worker);

if (!assign_queued_request(worker, respond)) {
if (next_request != nullptr) {
send_request(worker, next_request, respond);
} else {
logger_->debug(" - worker {} is now free", worker->get_description());
}
} else if (status == "INTERNAL_ERROR") {
Expand All @@ -251,15 +257,19 @@ void broker_handler::process_worker_done(
return;
}

auto failed_request = worker->cancel_request();
auto failed_request = queue_->worker_cancelled(worker);
failed_request->failure_count += 1;

if (!failed_request->data.is_complete()) {
status_notifier.rejected_job(
failed_request->data.get_job_id(), "Job failed with '" + message.at(3) + "' and cannot be reassigned");
} else if (check_failure_count(failed_request, status_notifier, respond)) {
reassign_request(failed_request, respond);
} else {
assign_queued_request(worker, respond);
auto new_request = queue_->assign_request(worker);
if (new_request) {
send_request(worker, new_request, respond);
}
}
} else if (status == "FAILED") {
if (message.size() != 4) {
Expand All @@ -270,8 +280,13 @@ void broker_handler::process_worker_done(

status_notifier.job_failed(message.at(1), message.at(3));

worker->cancel_request();
assign_queued_request(worker, respond);
auto failed_request = queue_->worker_cancelled(worker);
failed_request->failure_count += 1;
auto new_request = queue_->assign_request(worker);

if (new_request) {
send_request(worker, new_request, respond);
}
} else {
logger_->warn("Received unexpected status code {} from worker {}", status, worker->get_description());
}
Expand Down Expand Up @@ -334,7 +349,8 @@ void broker_handler::process_timer(const message_container &message, handler_int
logger_->info("Worker {} expired", worker->get_description());

workers_->remove_worker(worker);
auto requests = worker->terminate();
queue_->get_current_request(worker)->failure_count += 1;
auto requests = queue_->worker_terminated(worker);
std::vector<worker::request_ptr> unassigned_requests;

for (auto request : *requests) {
Expand Down Expand Up @@ -371,33 +387,29 @@ bool broker_handler::reassign_request(worker::request_ptr request, handler_inter
{
logger_->debug(
" - reassigning job {} ({} attempts already failed)", request->data.get_job_id(), request->failure_count);
worker_registry::worker_ptr substitute_worker = workers_->find_worker(request->headers);

if (substitute_worker == nullptr) {
enqueue_result result = queue_->enqueue_request(request);

if (!result.enqueued) {
notify_monitor(request, "FAILED", respond);
return false;
}

substitute_worker->enqueue_request(request);
if (!assign_queued_request(substitute_worker, respond)) {
if (result.assigned_to != nullptr) {
send_request(result.assigned_to, request, respond);
logger_->debug(
" - job {} queued for worker {}", request->data.get_job_id(), substitute_worker->get_description());
" - job {} queued for worker {}", request->data.get_job_id(), result.assigned_to->get_description());
}

return true;
}

bool broker_handler::assign_queued_request(worker_registry::worker_ptr worker, handler_interface::response_cb respond)
void broker_handler::send_request(worker_registry::worker_ptr worker, request_ptr request, response_cb respond)
{
if (worker->next_request()) {
respond(message_container(
broker_connect::KEY_WORKERS, worker->identity, worker->get_current_request()->data.get()));
logger_->debug(
" - job {} sent to worker {}", worker->get_current_request()->data.get_job_id(), worker->get_description());
return true;
}

return false;
respond(message_container(
broker_connect::KEY_WORKERS, worker->identity, request->data.get()));
logger_->debug(
" - job {} sent to worker {}", request->data.get_job_id(), worker->get_description());
}

bool broker_handler::check_failure_count(worker::request_ptr request, status_notifier_interface &status_notifier,
Expand Down
16 changes: 10 additions & 6 deletions src/handlers/broker_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "../reactor/command_holder.h"
#include "../reactor/handler_interface.h"
#include "../worker_registry.h"
#include "../queuing/queue_manager_interface.h"

/**
* Processes requests from workers and clients and forwards them accordingly.
Expand All @@ -20,9 +21,8 @@ class broker_handler : public handler_interface
* @param workers worker registry (it's acceptable if it already contains some workers)
* @param logger an optional logger
*/
broker_handler(std::shared_ptr<const broker_config> config,
std::shared_ptr<worker_registry> workers,
std::shared_ptr<spdlog::logger> logger);
broker_handler(std::shared_ptr<const broker_config> config, std::shared_ptr<worker_registry> workers,
std::shared_ptr<queue_manager_interface> queue, std::shared_ptr<spdlog::logger> logger);

void on_request(const message_container &message, response_cb respond);

Expand All @@ -33,6 +33,9 @@ class broker_handler : public handler_interface
/** Worker registry used for keeping track of workers and their jobs */
std::shared_ptr<worker_registry> workers_;

/** The queue manager */
std::shared_ptr<queue_manager_interface> queue_;

/** A system logger */
std::shared_ptr<spdlog::logger> logger_;

Expand Down Expand Up @@ -93,12 +96,13 @@ class broker_handler : public handler_interface
bool reassign_request(worker::request_ptr request, response_cb respond);

/**
* Give a worker a new job from the queue
* @param worker the worker in need of a new job
* Send a job to a worker
* @param worker the worker in need of a new job (it must be free)
* @param request the request
* @param respond a callback to notify the worker about the reassigned job
* @return true if a job was sent to the worker, false otherwise
*/
bool assign_queued_request(worker_registry::worker_ptr worker, response_cb respond);
void send_request(worker_registry::worker_ptr worker, request_ptr request, response_cb respond);

/**
* Check if a request can be reassigned one more time and notify the frontend if not.
Expand Down
Loading

0 comments on commit 9e10b9e

Please sign in to comment.