Skip to content

Commit

Permalink
Adjusting tests, fixing bugs, updating readme, and tiding up loose ends.
Browse files Browse the repository at this point in the history
  • Loading branch information
krulis-martin committed Nov 11, 2022
1 parent d89a4e5 commit 31fe7fa
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 10 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 2.8.11)
project(recodex-broker)
set(RECODEX_VERSION "1.3.1")
set(RECODEX_VERSION "1.4.0")

set(EXEC_NAME ${PROJECT_NAME})
enable_testing()
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ The default location for broker configuration file is
`err`, `warn`, `notice`, `info` and `debug`
- _max-size_ -- maximal size of log file before rotating
- _rotations_ -- number of rotation kept
- _queue_manager_ -- selection of the queue manager implementation responsible for assigning jobs to workers. Currently only `single` (the default) and `multi` queue managers are in production version. Single-queue manager has one queue and dispatches jobs on demand as workers become available. Multi-queue manager has a queue for every worker, jobs are assigned immediately and cannot be re-assigned unless failure occurs. I.e., `single` provides better load balancing, `multi` has lower dispatching overhead.

### Example config file

Expand Down Expand Up @@ -172,6 +173,7 @@ logger:
level: "debug" # level of logging
max-size: 1048576 # 1 MB; max size of file before log rotation
rotations: 3 # number of rotations kept
queue_manager: "single" # name of the manager that handles job dispatching among queues (single is the default)
```

## Documentation
Expand Down
8 changes: 3 additions & 5 deletions src/queuing/single_queue_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ struct fcfs_job_comparator {
{
return a.arrived_at < b.arrived_at;
}

void drop_caches() {}
};


Expand Down Expand Up @@ -92,8 +90,6 @@ class single_queue_manager : public queue_manager_interface
request_ptr assign_request(worker_ptr worker) override
{
worker_jobs_[worker] = nullptr;
comparator_->drop_caches();

std::sort(jobs_.begin(), jobs_.end(), [this, worker] (const request_entry &a, const request_entry &b) {
return comparator_->compare(a, b, worker);
});
Expand All @@ -115,7 +111,9 @@ class single_queue_manager : public queue_manager_interface
std::shared_ptr<std::vector<request_ptr>> worker_terminated(worker_ptr worker) override
{
auto result = std::make_shared<std::vector<request_ptr>>();
result->push_back(worker_jobs_[worker]);
if (worker_jobs_[worker] != nullptr) {
result->push_back(worker_jobs_[worker]); // currently running job (returned for possible reasignment)
}
worker_jobs_.erase(worker);
workers_.erase(std::remove(workers_.begin(), workers_.end(), worker), workers_.end());

Expand Down
58 changes: 54 additions & 4 deletions tests/single_queue_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,22 @@ TEST(single_queue_manager, terminate_no_current)
job_request_data data("", {});

auto worker_1 = std::make_shared<worker>("identity1", "group_1", headers);
manager.add_worker(worker_1);
auto add_res = manager.add_worker(worker_1);
ASSERT_EQ(nullptr, add_res);

auto request_1 = std::make_shared<request>(headers, request::metadata_t{{}}, data);
auto request_2 = std::make_shared<request>(headers, request::metadata_t{{}}, data);

manager.enqueue_request(request_1);
manager.enqueue_request(request_2);
manager.worker_cancelled(worker_1);
auto enq_res1 = manager.enqueue_request(request_1);
ASSERT_EQ(worker_1, enq_res1.assigned_to);
ASSERT_EQ(true, enq_res1.enqueued);

auto enq_res2 = manager.enqueue_request(request_2);
ASSERT_EQ(nullptr, enq_res2.assigned_to);
ASSERT_EQ(true, enq_res2.enqueued);

auto cancel_res = manager.worker_cancelled(worker_1);
ASSERT_EQ(request_1, cancel_res);

ASSERT_THAT(manager.worker_terminated(worker_1), Pointee(ElementsAre(request_2)));
}
Expand Down Expand Up @@ -138,3 +147,44 @@ TEST(single_queue_manager, load_balancing)
ASSERT_NE(result_2.assigned_to, nullptr);
ASSERT_NE(result_1.assigned_to, result_2.assigned_to);
}

TEST(single_queue_manager, rejecting_unprocessable_jobs)
{
single_queue_manager manager;

auto worker_1 = worker_ptr(new worker("id1234", "group_1", {{"env", "c"}}));
auto worker_2 = worker_ptr(new worker("id12345", "group_2", {{"env", "c"}}));

manager.add_worker(worker_1);
manager.add_worker(worker_2);

request::headers_t headers1 = {{"hwgroup", "group_1"}, {"env", "c"}};
request::headers_t headers2 = {{"hwgroup", "group_2"}, {"env", "c"}};
job_request_data data("", {});
auto request_11 = std::make_shared<request>(headers1, request::metadata_t{{}}, data);
auto request_12 = std::make_shared<request>(headers1, request::metadata_t{{}}, data);
auto request_21 = std::make_shared<request>(headers2, request::metadata_t{{}}, data);
auto request_22 = std::make_shared<request>(headers2, request::metadata_t{{}}, data);

auto result_21 = manager.enqueue_request(request_21);
auto result_11 = manager.enqueue_request(request_11);
auto result_12 = manager.enqueue_request(request_12);
auto result_22 = manager.enqueue_request(request_22);

ASSERT_TRUE(result_11.enqueued);
ASSERT_TRUE(result_12.enqueued);
ASSERT_TRUE(result_21.enqueued);
ASSERT_TRUE(result_22.enqueued);
ASSERT_EQ(result_11.assigned_to, worker_1);
ASSERT_EQ(result_12.assigned_to, nullptr);
ASSERT_EQ(result_21.assigned_to, worker_2);
ASSERT_EQ(result_22.assigned_to, nullptr);

// termination of w1 returns all reqs for hw group_1
ASSERT_THAT(manager.worker_terminated(worker_1), Pointee(ElementsAre(request_11, request_12)));

// requests for hw group1 cannot be reassigned
auto re_result_11 = manager.enqueue_request(request_11);;
ASSERT_FALSE(re_result_11.enqueued);
ASSERT_EQ(re_result_11.assigned_to, nullptr);
}

0 comments on commit 31fe7fa

Please sign in to comment.