Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for context queue pop. #382

Merged
merged 11 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ inline void InitDecompressorQueueDefault(bool no_cuda = false)
if (initialized) {
return;
}
decompressor_context_queue.init(1, (uint32_t)std::thread::hardware_concurrency(), false, 9, !no_cuda, 0, false);
decompressor_context_queue.init(1, (uint32_t)std::thread::hardware_concurrency(), false, 9, !no_cuda, 0, false, 30);
initialized = true;
}

Expand Down
28 changes: 24 additions & 4 deletions src/prover_disk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <vector>
#include <mutex>
#include <condition_variable>
#include <chrono>

#include "../lib/include/picosha2.hpp"
#include "calculate_bucket.hpp"
Expand Down Expand Up @@ -67,7 +68,8 @@ class ContextQueue {
const uint32_t max_compression_level,
bool use_gpu_harvesting,
uint32_t gpu_index,
bool enforce_gpu_index
bool enforce_gpu_index,
uint16_t context_queue_timeout
) {
assert(!_dcompressor_queue_initialized);
_dcompressor_queue_initialized = true;
Expand Down Expand Up @@ -167,6 +169,7 @@ class ContextQueue {
}
}
}
this->context_queue_timeout = context_queue_timeout;
return false;
}

Expand All @@ -179,9 +182,24 @@ class ContextQueue {

GreenReaperContext* pop() {
std::unique_lock<std::mutex> lock(mutex);
while (queue.empty()) {
condition.wait(lock);

std::chrono::duration<double> wait_time = std::chrono::seconds(context_queue_timeout);

while (queue.empty() && wait_time.count() > 0) {
auto before_wait = std::chrono::steady_clock::now();

if (condition.wait_for(lock, wait_time) == std::cv_status::timeout) {
break;
}

auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - before_wait);
wait_time -= elapsed;
}

if (queue.empty()) {
throw std::runtime_error("Timeout waiting for context queue.");
}

GreenReaperContext* gr = queue.front();
queue.pop();
return gr;
Expand All @@ -191,6 +209,7 @@ class ContextQueue {
std::queue<GreenReaperContext*> queue;
std::mutex mutex;
std::condition_variable condition;
uint16_t context_queue_timeout;
};

class ProofCache {
Expand Down Expand Up @@ -266,7 +285,8 @@ class ContextQueue {
const uint32_t max_compression_level,
bool use_gpu_harvesting,
uint32_t gpu_index,
bool enforce_gpu_index
bool enforce_gpu_index,
uint16_t context_queue_timeout
)
{
return false;
Expand Down