Skip to content

Commit

Permalink
Fixing job-removal issue raised when worker dies.
Browse files Browse the repository at this point in the history
  • Loading branch information
krulis-martin committed Nov 11, 2022
1 parent 978c7cc commit d89a4e5
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/queuing/queue_manager_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class queue_manager_interface
virtual request_ptr worker_finished(worker_ptr worker) = 0;

/**
* Mark the current request of a worker as cancelled and do not assign it another request.
* Mark the current request of a worker as cancelled and do not (re)assign it to another worker.
* Called when the worker machine fails to process the request.
* The caller can decide whether the request should be enqueued again or not.
* @param worker the worker whose job was cancelled
Expand Down
62 changes: 40 additions & 22 deletions src/queuing/single_queue_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ class single_queue_manager : public queue_manager_interface
std::map<worker_ptr, request_ptr> worker_jobs_;
std::vector<worker_ptr> workers_;

/**
* Check whether a worker exists capable of processing given request (according to headers)
* @param request_ptr request to be tested
* @return true if some worker exists, false if the request cannot be accomodated with current workers
*/
bool is_request_assignable(request_ptr request) const
{
for (auto &worker : workers_) {
if (worker->check_headers(request->headers)) {
return true;
}
}
return false;
}

public:
explicit single_queue_manager():
comparator_(std::make_unique<JobComparator>()), selector_(std::make_unique<IdleWorkerSelector>())
Expand Down Expand Up @@ -103,7 +118,21 @@ class single_queue_manager : public queue_manager_interface
result->push_back(worker_jobs_[worker]);
worker_jobs_.erase(worker);
workers_.erase(std::remove(workers_.begin(), workers_.end(), worker), workers_.end());
return result;

// filter jobs and remove those wich are no longer process-able (after worker removal)
for (auto &&job : jobs_) {
if (!is_request_assignable(job.request)) {
// the job cannot be accomodated anymore...
result->push_back(job.request);
job.request = nullptr; // mark the job for removal
}
}
jobs_.erase( // remove marked jobs
std::remove_if(jobs_.begin(), jobs_.end(), [](auto &job) { return job.request == nullptr; }),
jobs_.end()
);

return result; // return removed requests
}

enqueue_result enqueue_request(request_ptr request) override
Expand All @@ -119,31 +148,20 @@ class single_queue_manager : public queue_manager_interface
};
}

// If no worker able to process the job exists, reject it
for (auto it = std::begin(worker_jobs_); it != std::end(worker_jobs_); ++it) {
if (it->first->check_headers(request->headers)) {
break;
}

if (std::next(it) == std::end(worker_jobs_)) {
return enqueue_result{
.assigned_to = nullptr,
.enqueued = false,
};
}
bool assignable = is_request_assignable(request);
if (assignable) {
// Enqueue the job
jobs_.push_back(request_entry{
.request = request,
.arrived_at = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()
),
});
}

// Enqueue the job
jobs_.push_back(request_entry{
.request = request,
.arrived_at = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()
),
});

return enqueue_result{
.assigned_to = nullptr,
.enqueued = true,
.enqueued = assignable,
};
}

Expand Down

0 comments on commit d89a4e5

Please sign in to comment.