From 10c85837e52c4a4905df00332a15c0d50e61a8f3 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Wed, 16 Aug 2023 23:32:59 +0300 Subject: [PATCH] Add timeout for context queue pop. (#382) Co-authored-by: Earle Lowe --- src/cli.cpp | 2 +- src/prover_disk.hpp | 28 ++++++++++++++++++++++++---- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/cli.cpp b/src/cli.cpp index 526a72140..8f96d7364 100644 --- a/src/cli.cpp +++ b/src/cli.cpp @@ -70,7 +70,7 @@ inline void InitDecompressorQueueDefault(bool no_cuda = false) if (initialized) { return; } - decompressor_context_queue.init(1, (uint32_t)std::thread::hardware_concurrency(), false, 9, !no_cuda, 0, false); + decompressor_context_queue.init(1, (uint32_t)std::thread::hardware_concurrency(), false, 9, !no_cuda, 0, false, 30); initialized = true; } diff --git a/src/prover_disk.hpp b/src/prover_disk.hpp index 93a52b697..abc3adae6 100644 --- a/src/prover_disk.hpp +++ b/src/prover_disk.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include "../lib/include/picosha2.hpp" #include "calculate_bucket.hpp" @@ -67,7 +68,8 @@ class ContextQueue { const uint32_t max_compression_level, bool use_gpu_harvesting, uint32_t gpu_index, - bool enforce_gpu_index + bool enforce_gpu_index, + uint16_t context_queue_timeout ) { assert(!_dcompressor_queue_initialized); _dcompressor_queue_initialized = true; @@ -167,6 +169,7 @@ class ContextQueue { } } } + this->context_queue_timeout = context_queue_timeout; return false; } @@ -179,9 +182,24 @@ class ContextQueue { GreenReaperContext* pop() { std::unique_lock lock(mutex); - while (queue.empty()) { - condition.wait(lock); + + std::chrono::duration wait_time = std::chrono::seconds(context_queue_timeout); + + while (queue.empty() && wait_time.count() > 0) { + auto before_wait = std::chrono::steady_clock::now(); + + if (condition.wait_for(lock, wait_time) == std::cv_status::timeout) { + break; + } + + auto elapsed = std::chrono::duration_cast>(std::chrono::steady_clock::now() - before_wait); + wait_time -= elapsed; + } + + if (queue.empty()) { + throw std::runtime_error("Timeout waiting for context queue."); } + GreenReaperContext* gr = queue.front(); queue.pop(); return gr; @@ -191,6 +209,7 @@ class ContextQueue { std::queue queue; std::mutex mutex; std::condition_variable condition; + uint16_t context_queue_timeout; }; class ProofCache { @@ -266,7 +285,8 @@ class ContextQueue { const uint32_t max_compression_level, bool use_gpu_harvesting, uint32_t gpu_index, - bool enforce_gpu_index + bool enforce_gpu_index, + uint16_t context_queue_timeout ) { return false;