From f4dab52a241d6ce7b5b66da3e299fbbb15d6e9d3 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 22 Jan 2025 00:04:09 +0100 Subject: [PATCH 1/4] server : add more clean up when cancel_tasks is called --- examples/server/server.cpp | 42 ++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 5f08c4eccd54a..fb41775cab9e1 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1428,6 +1428,10 @@ struct server_queue { } else { queue_tasks.push_back(std::move(task)); } + // if this is cancel task make sure to clean up pending tasks + if (task.type == SERVER_TASK_TYPE_CANCEL) { + cleanup_pending_task(task.id_target); + } condition_tasks.notify_one(); return task.id; } @@ -1445,6 +1449,10 @@ struct server_queue { } else { queue_tasks.push_back(std::move(task)); } + // if this is cancel task make sure to clean up pending tasks + if (task.type == SERVER_TASK_TYPE_CANCEL) { + cleanup_pending_task(task.id_target); + } } condition_tasks.notify_one(); return 0; @@ -1539,6 +1547,23 @@ struct server_queue { } } } + +private: + void cleanup_pending_task(int id_task) { + // no need lock because this is called exclusively by post() + for (size_t i = 0; i < queue_tasks.size(); i++) { + if (queue_tasks[i].id == id_task) { + queue_tasks.erase(queue_tasks.begin() + i); + break; + } + } + for (size_t i = 0; i < queue_tasks_deferred.size(); i++) { + if (queue_tasks_deferred[i].id == id_task) { + queue_tasks_deferred.erase(queue_tasks_deferred.begin() + i); + break; + } + } + } }; struct server_response { @@ -1574,6 +1599,13 @@ struct server_response { std::unique_lock lock(mutex_results); waiting_task_ids.erase(id_task); + // make sure to clean up all pending results + for (size_t i = 0; i < queue_results.size(); i++) { + if (queue_results[i]->id == id_task) { + queue_results.erase(queue_results.begin() + i); + i--; + } + } } void remove_waiting_task_ids(const std::unordered_set & id_tasks) { @@ -1593,7 +1625,7 @@ struct server_response { return !queue_results.empty(); }); - for (int i = 0; i < (int) queue_results.size(); i++) { + for (size_t i = 0; i < queue_results.size(); i++) { if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) { server_task_result_ptr res = std::move(queue_results[i]); queue_results.erase(queue_results.begin() + i); @@ -1610,10 +1642,8 @@ struct server_response { server_task_result_ptr recv_with_timeout(const std::unordered_set & id_tasks, int timeout) { while (true) { std::unique_lock lock(mutex_results); - bool cr_res = condition_results.wait_for(lock, std::chrono::seconds(timeout), [&]{ - return !queue_results.empty(); - }); - if (!cr_res) { + std::cv_status cr_res = condition_results.wait_for(lock, std::chrono::seconds(timeout)); + if (cr_res == std::cv_status::timeout) { return nullptr; } @@ -2368,8 +2398,8 @@ struct server_context { server_task task(SERVER_TASK_TYPE_CANCEL); task.id_target = id_task; - cancel_tasks.push_back(task); queue_results.remove_waiting_task_id(id_task); + cancel_tasks.push_back(task); } // push to beginning of the queue, so it has highest priority queue_tasks.post(cancel_tasks, true); From 08296ec74ddf61dfa8ab99c1f03f4fd929255a34 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 22 Jan 2025 10:44:23 +0100 Subject: [PATCH 2/4] fix recv_with_timeout --- examples/server/server.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index fb41775cab9e1..37e1c94c5a055 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1642,10 +1642,6 @@ struct server_response { server_task_result_ptr recv_with_timeout(const std::unordered_set & id_tasks, int timeout) { while (true) { std::unique_lock lock(mutex_results); - std::cv_status cr_res = condition_results.wait_for(lock, std::chrono::seconds(timeout)); - if (cr_res == std::cv_status::timeout) { - return nullptr; - } for (int i = 0; i < (int) queue_results.size(); i++) { if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) { @@ -1654,6 +1650,11 @@ struct server_response { return res; } } + + std::cv_status cr_res = condition_results.wait_for(lock, std::chrono::seconds(timeout)); + if (cr_res == std::cv_status::timeout) { + return nullptr; + } } // should never reach here From b9e517106f1a3fc799b9b96ae789e6a6231cbe6d Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 22 Jan 2025 12:57:00 +0100 Subject: [PATCH 3/4] std::remove_if --- examples/server/server.cpp | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 37e1c94c5a055..1cafe90b8db6a 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1551,18 +1551,11 @@ struct server_queue { private: void cleanup_pending_task(int id_task) { // no need lock because this is called exclusively by post() - for (size_t i = 0; i < queue_tasks.size(); i++) { - if (queue_tasks[i].id == id_task) { - queue_tasks.erase(queue_tasks.begin() + i); - break; - } - } - for (size_t i = 0; i < queue_tasks_deferred.size(); i++) { - if (queue_tasks_deferred[i].id == id_task) { - queue_tasks_deferred.erase(queue_tasks_deferred.begin() + i); - break; - } - } + auto rm_func = [id_task](const server_task & task) { + return task.id_target == id_task; + }; + std::remove_if(queue_tasks.begin(), queue_tasks.end(), rm_func); + std::remove_if(queue_tasks_deferred.begin(), queue_tasks_deferred.end(), rm_func); } }; @@ -1600,12 +1593,13 @@ struct server_response { std::unique_lock lock(mutex_results); waiting_task_ids.erase(id_task); // make sure to clean up all pending results - for (size_t i = 0; i < queue_results.size(); i++) { - if (queue_results[i]->id == id_task) { - queue_results.erase(queue_results.begin() + i); - i--; + std::remove_if( + queue_results.begin(), + queue_results.end(), + [id_task](const server_task_result_ptr & res) { + return res->id == id_task; } - } + ); } void remove_waiting_task_ids(const std::unordered_set & id_tasks) { From 0244e79763e256f2b373b4b00200bf197903975f Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 22 Jan 2025 13:05:57 +0100 Subject: [PATCH 4/4] fix std::remove_if --- examples/server/server.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 1cafe90b8db6a..e23427a0b94d4 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1554,8 +1554,12 @@ struct server_queue { auto rm_func = [id_task](const server_task & task) { return task.id_target == id_task; }; - std::remove_if(queue_tasks.begin(), queue_tasks.end(), rm_func); - std::remove_if(queue_tasks_deferred.begin(), queue_tasks_deferred.end(), rm_func); + queue_tasks.erase( + std::remove_if(queue_tasks.begin(), queue_tasks.end(), rm_func), + queue_tasks.end()); + queue_tasks_deferred.erase( + std::remove_if(queue_tasks_deferred.begin(), queue_tasks_deferred.end(), rm_func), + queue_tasks_deferred.end()); } }; @@ -1593,13 +1597,11 @@ struct server_response { std::unique_lock lock(mutex_results); waiting_task_ids.erase(id_task); // make sure to clean up all pending results - std::remove_if( - queue_results.begin(), - queue_results.end(), - [id_task](const server_task_result_ptr & res) { + queue_results.erase( + std::remove_if(queue_results.begin(), queue_results.end(), [id_task](const server_task_result_ptr & res) { return res->id == id_task; - } - ); + }), + queue_results.end()); } void remove_waiting_task_ids(const std::unordered_set & id_tasks) {