Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue refactoring #1

Merged
merged 3 commits into from
May 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ set(SOURCE_FILES
src/notifier/reactor_status_notifier.h
src/broker_connect.cpp
src/reactor/command_holder.cpp
src/queuing/queue_manager_interface.h
src/queuing/multi_queue_manager.cpp
src/queuing/multi_queue_manager.h
)

add_executable(${EXEC_NAME} ${SOURCE_FILES})
Expand Down
5 changes: 3 additions & 2 deletions src/broker_connect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ const std::string broker_connect::MONITOR_IDENTITY = "recodex-monitor";
broker_connect::broker_connect(std::shared_ptr<const broker_config> config,
std::shared_ptr<zmq::context_t> context,
std::shared_ptr<worker_registry> router,
std::shared_ptr<queue_manager_interface> queue,
std::shared_ptr<spdlog::logger> logger)
: config_(config), logger_(logger), workers_(router), reactor_(context)
: config_(config), logger_(logger), workers_(router), reactor_(context), queue_(queue)
{
if (logger_ == nullptr) {
logger_ = helpers::create_null_logger();
Expand All @@ -35,7 +36,7 @@ broker_connect::broker_connect(std::shared_ptr<const broker_config> config,
reactor_.add_socket(KEY_MONITOR, std::make_shared<router_socket_wrapper>(context, monitor_endpoint, false));

reactor_.add_handler(
{KEY_CLIENTS, KEY_WORKERS, KEY_TIMER}, std::make_shared<broker_handler>(config_, workers_, logger_));
{KEY_CLIENTS, KEY_WORKERS, KEY_TIMER}, std::make_shared<broker_handler>(config_, workers_, queue_, logger_));
reactor_.add_async_handler(
{KEY_STATUS_NOTIFIER}, std::make_shared<status_notifier_handler>(config_->get_notifier_config(), logger_));
}
Expand Down
4 changes: 4 additions & 0 deletions src/broker_connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class broker_connect
std::shared_ptr<spdlog::logger> logger_;
/** Registry of connected and alive workers. */
std::shared_ptr<worker_registry> workers_;
/** Queue manager */
std::shared_ptr<queue_manager_interface> queue_;
/** A reactor that provides us with an event-based API to communicate with the clients and workers */
reactor reactor_;

Expand All @@ -53,11 +55,13 @@ class broker_connect
* @param config a configuration object used to set up the connections
* @param context ZeroMQ context
* @param router A registry used to track workers and their jobs
* @param queue A queue manager
* @param logger
*/
broker_connect(std::shared_ptr<const broker_config> config,
std::shared_ptr<zmq::context_t> context,
std::shared_ptr<worker_registry> router,
std::shared_ptr<queue_manager_interface> queue,
std::shared_ptr<spdlog::logger> logger = nullptr);

/**
Expand Down
5 changes: 3 additions & 2 deletions src/broker_core.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "broker_core.h"
#include "queuing/multi_queue_manager.h"

broker_core::broker_core(std::vector<std::string> args)
: args_(args), config_filename_("config.yml"), logger_(nullptr), broker_(nullptr)
Expand Down Expand Up @@ -101,7 +102,6 @@ void broker_core::log_init()
// Create multithreaded rotating file sink. Max filesize is 1024 * 1024 and we save 5 newest files.
auto rotating_sink =
std::make_shared<spdlog::sinks::rotating_file_sink_mt>((path / log_conf.log_basename).string(),
log_conf.log_suffix,
log_conf.log_file_size,
log_conf.log_files_count);
// Set queue size for asynchronous logging. It must be a power of 2. Also, flush every second.
Expand Down Expand Up @@ -129,7 +129,8 @@ void broker_core::broker_init()
logger_->info("Initializing broker connection...");
workers_ = std::make_shared<worker_registry>();
context_ = std::make_shared<zmq::context_t>(1);
broker_ = std::make_shared<broker_connect>(config_, context_, workers_, logger_);
queue_ = std::make_shared<multi_queue_manager>();
broker_ = std::make_shared<broker_connect>(config_, context_, workers_, queue_, logger_);
logger_->info("Broker connection initialized.");
}

Expand Down
3 changes: 3 additions & 0 deletions src/broker_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ class broker_core
/** Pointer to task router (handles alive worker and routing tasks between them). */
std::shared_ptr<worker_registry> workers_;

/** Pointer to queue manager */
std::shared_ptr<queue_manager_interface> queue_;

/** Pointer to ZeroMQ context. */
std::shared_ptr<zmq::context_t> context_;

Expand Down
102 changes: 57 additions & 45 deletions src/handlers/broker_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
#include "../broker_connect.h"
#include "../notifier/reactor_status_notifier.h"

broker_handler::broker_handler(std::shared_ptr<const broker_config> config,
std::shared_ptr<worker_registry> workers,
std::shared_ptr<spdlog::logger> logger)
: config_(config), workers_(workers), logger_(logger)
broker_handler::broker_handler(std::shared_ptr<const broker_config> config, std::shared_ptr<worker_registry> workers,
std::shared_ptr<queue_manager_interface> queue, std::shared_ptr<spdlog::logger> logger)
: config_(config), workers_(workers), logger_(logger), queue_(queue)
{
if (logger_ == nullptr) {
logger_ = helpers::create_null_logger();
Expand Down Expand Up @@ -93,27 +92,27 @@ void broker_handler::process_client_eval(
++it;
}

worker_registry::worker_ptr worker = workers_->find_worker(headers);

if (worker != nullptr) {
logger_->debug(" - incomming job {}", job_id);
// Create a job request object
// Forward remaining messages to the worker without actually understanding them
std::vector<std::string> additional_data;
for (; it != std::end(message); ++it) {
additional_data.push_back(*it);
}

// Forward remaining messages to the worker without actually understanding them
std::vector<std::string> additional_data;
for (; it != std::end(message); ++it) {
additional_data.push_back(*it);
}
job_request_data request_data(job_id, additional_data);
job_request_data request_data(job_id, additional_data);
logger_->debug(" - incoming job {}", job_id);

auto eval_request = std::make_shared<request>(headers, request_data);
worker->enqueue_request(eval_request);
auto eval_request = std::make_shared<request>(headers, request_data);
enqueue_result result = queue_->enqueue_request(eval_request);

if (!assign_queued_request(worker, respond)) {
logger_->debug(" - saved to queue for worker '{}'", worker->get_description());
if (result.enqueued) {
if (result.assigned_to != nullptr) {
send_request(result.assigned_to, eval_request, respond);
} else {
logger_->debug(" - saved to queue");
}

respond(message_container(broker_connect::KEY_CLIENTS, identity, {"accept"}));
workers_->deprioritize_worker(worker);
} else {
respond(message_container(broker_connect::KEY_CLIENTS, identity, {"reject"}));
notify_monitor(job_id, "FAILED", respond);
Expand Down Expand Up @@ -171,6 +170,7 @@ void broker_handler::process_worker_init(

// Create a new worker with the basic information
auto new_worker = worker_registry::worker_ptr(new worker(identity, hwgroup, headers));
std::shared_ptr<request> current_request = nullptr;

// Load additional information
for (; message_it != std::end(message); ++message_it) {
Expand All @@ -184,14 +184,18 @@ void broker_handler::process_worker_init(
if (key == "description") {
new_worker->description = value;
} else if (key == "current_job") {
auto current_request = worker::request_ptr(new request(job_request_data(value)));
new_worker->enqueue_request(current_request);
new_worker->next_request();
current_request = worker::request_ptr(new request(job_request_data(value)));
}
}

// Insert the worker into the registry
workers_->add_worker(new_worker);
request_ptr request = queue_->add_worker(new_worker, current_request);

// Give the worker a job if necessary
if (request != nullptr) {
send_request(new_worker, request, respond);
}

// Start an idle timer for our new worker
worker_timers_.emplace(new_worker, std::chrono::milliseconds(0));
Expand Down Expand Up @@ -223,7 +227,7 @@ void broker_handler::process_worker_done(
return;
}

std::shared_ptr<const request> current = worker->get_current_request();
request_ptr current = queue_->get_current_request(worker);

if (message.at(1) != current->data.get_job_id()) {
logger_->error("Got 'done' message from worker {} with mismatched job id - {} (message) vs. {} (worker)",
Expand All @@ -238,9 +242,11 @@ void broker_handler::process_worker_done(
if (status == "OK") {
// notify frontend that job ended successfully and complete it internally
status_notifier.job_done(message.at(1));
worker->complete_request();
request_ptr next_request = queue_->worker_finished(worker);

if (!assign_queued_request(worker, respond)) {
if (next_request != nullptr) {
send_request(worker, next_request, respond);
} else {
logger_->debug(" - worker {} is now free", worker->get_description());
}
} else if (status == "INTERNAL_ERROR") {
Expand All @@ -251,15 +257,19 @@ void broker_handler::process_worker_done(
return;
}

auto failed_request = worker->cancel_request();
auto failed_request = queue_->worker_cancelled(worker);
failed_request->failure_count += 1;

if (!failed_request->data.is_complete()) {
status_notifier.rejected_job(
failed_request->data.get_job_id(), "Job failed with '" + message.at(3) + "' and cannot be reassigned");
} else if (check_failure_count(failed_request, status_notifier, respond)) {
reassign_request(failed_request, respond);
} else {
assign_queued_request(worker, respond);
auto new_request = queue_->assign_request(worker);
if (new_request) {
send_request(worker, new_request, respond);
}
}
} else if (status == "FAILED") {
if (message.size() != 4) {
Expand All @@ -270,8 +280,13 @@ void broker_handler::process_worker_done(

status_notifier.job_failed(message.at(1), message.at(3));

worker->cancel_request();
assign_queued_request(worker, respond);
auto failed_request = queue_->worker_cancelled(worker);
failed_request->failure_count += 1;
auto new_request = queue_->assign_request(worker);

if (new_request) {
send_request(worker, new_request, respond);
}
} else {
logger_->warn("Received unexpected status code {} from worker {}", status, worker->get_description());
}
Expand Down Expand Up @@ -334,7 +349,8 @@ void broker_handler::process_timer(const message_container &message, handler_int
logger_->info("Worker {} expired", worker->get_description());

workers_->remove_worker(worker);
auto requests = worker->terminate();
queue_->get_current_request(worker)->failure_count += 1;
auto requests = queue_->worker_terminated(worker);
std::vector<worker::request_ptr> unassigned_requests;

for (auto request : *requests) {
Expand Down Expand Up @@ -371,33 +387,29 @@ bool broker_handler::reassign_request(worker::request_ptr request, handler_inter
{
logger_->debug(
" - reassigning job {} ({} attempts already failed)", request->data.get_job_id(), request->failure_count);
worker_registry::worker_ptr substitute_worker = workers_->find_worker(request->headers);

if (substitute_worker == nullptr) {
enqueue_result result = queue_->enqueue_request(request);

if (!result.enqueued) {
notify_monitor(request, "FAILED", respond);
return false;
}

substitute_worker->enqueue_request(request);
if (!assign_queued_request(substitute_worker, respond)) {
if (result.assigned_to != nullptr) {
send_request(result.assigned_to, request, respond);
logger_->debug(
" - job {} queued for worker {}", request->data.get_job_id(), substitute_worker->get_description());
" - job {} queued for worker {}", request->data.get_job_id(), result.assigned_to->get_description());
}

return true;
}

bool broker_handler::assign_queued_request(worker_registry::worker_ptr worker, handler_interface::response_cb respond)
void broker_handler::send_request(worker_registry::worker_ptr worker, request_ptr request, response_cb respond)
{
if (worker->next_request()) {
respond(message_container(
broker_connect::KEY_WORKERS, worker->identity, worker->get_current_request()->data.get()));
logger_->debug(
" - job {} sent to worker {}", worker->get_current_request()->data.get_job_id(), worker->get_description());
return true;
}

return false;
respond(message_container(
broker_connect::KEY_WORKERS, worker->identity, request->data.get()));
logger_->debug(
" - job {} sent to worker {}", request->data.get_job_id(), worker->get_description());
}

bool broker_handler::check_failure_count(worker::request_ptr request, status_notifier_interface &status_notifier,
Expand Down
16 changes: 10 additions & 6 deletions src/handlers/broker_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "../reactor/command_holder.h"
#include "../reactor/handler_interface.h"
#include "../worker_registry.h"
#include "../queuing/queue_manager_interface.h"

/**
* Processes requests from workers and clients and forwards them accordingly.
Expand All @@ -20,9 +21,8 @@ class broker_handler : public handler_interface
* @param workers worker registry (it's acceptable if it already contains some workers)
* @param logger an optional logger
*/
broker_handler(std::shared_ptr<const broker_config> config,
std::shared_ptr<worker_registry> workers,
std::shared_ptr<spdlog::logger> logger);
broker_handler(std::shared_ptr<const broker_config> config, std::shared_ptr<worker_registry> workers,
std::shared_ptr<queue_manager_interface> queue, std::shared_ptr<spdlog::logger> logger);

void on_request(const message_container &message, response_cb respond);

Expand All @@ -33,6 +33,9 @@ class broker_handler : public handler_interface
/** Worker registry used for keeping track of workers and their jobs */
std::shared_ptr<worker_registry> workers_;

/** The queue manager */
std::shared_ptr<queue_manager_interface> queue_;

/** A system logger */
std::shared_ptr<spdlog::logger> logger_;

Expand Down Expand Up @@ -93,12 +96,13 @@ class broker_handler : public handler_interface
bool reassign_request(worker::request_ptr request, response_cb respond);

/**
* Give a worker a new job from the queue
* @param worker the worker in need of a new job
* Send a job to a worker
* @param worker the worker in need of a new job (it must be free)
* @param request the request
* @param respond a callback to notify the worker about the reassigned job
* @return true if a job was sent to the worker, false otherwise
*/
bool assign_queued_request(worker_registry::worker_ptr worker, response_cb respond);
void send_request(worker_registry::worker_ptr worker, request_ptr request, response_cb respond);

/**
* Check if a request can be reassigned one more time and notify the frontend if not.
Expand Down
Loading