Skip to content

Commit

Permalink
Update timeout handling
Browse files Browse the repository at this point in the history
  • Loading branch information
DemetrisChr committed Oct 17, 2023
1 parent 5ba2899 commit de48409
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 27 deletions.
40 changes: 19 additions & 21 deletions core/transactions/internal/utils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ error_class_from_response(const Resp& resp)

static constexpr std::chrono::milliseconds DEFAULT_RETRY_OP_DELAY{ 3 };
static constexpr std::chrono::milliseconds DEFAULT_RETRY_OP_EXP_DELAY{ 1 };
static constexpr std::chrono::milliseconds DEFAULT_RETRY_OP_MAX_EXP_DELAY{ 100 };
static constexpr std::size_t DEFAULT_RETRY_OP_MAX_RETRIES{ 100 };
static constexpr double RETRY_OP_JITTER{ 0.1 }; // means +/- 10% for jitter.
static constexpr std::size_t DEFAULT_RETRY_OP_EXPONENT_CAP{ 8 };
Expand Down Expand Up @@ -310,45 +311,41 @@ struct async_exp_delay {
std::shared_ptr<asio::steady_timer> timer;
std::chrono::microseconds initial_delay;
std::chrono::microseconds max_delay;
std::chrono::microseconds timeout;
mutable std::uint32_t retries;
mutable std::optional<std::chrono::time_point<std::chrono::steady_clock>> end_time;
std::size_t max_retries;
mutable std::size_t retries;

template<typename R1, typename P1, typename R2, typename P2, typename R3, typename P3>
template<typename R1, typename P1, typename R2, typename P2>
async_exp_delay(std::shared_ptr<asio::steady_timer> timer,
std::chrono::duration<R1, P1> initial,
std::chrono::duration<R2, P2> max,
std::chrono::duration<R3, P3> limit)
std::size_t max_retries)
: timer(std::move(timer))
, initial_delay(std::chrono::duration_cast<std::chrono::microseconds>(initial))
, max_delay(std::chrono::duration_cast<std::chrono::microseconds>(max))
, timeout(std::chrono::duration_cast<std::chrono::microseconds>(limit))
, max_retries(max_retries)
, retries(0)
, end_time()
{
}

async_exp_delay(std::shared_ptr<asio::steady_timer> timer)
: async_exp_delay(std::move(timer), DEFAULT_RETRY_OP_EXP_DELAY, DEFAULT_RETRY_OP_MAX_EXP_DELAY, DEFAULT_RETRY_OP_MAX_RETRIES)
{
}

void operator()(utils::movable_function<void(std::exception_ptr)> callback) const
{
auto now = std::chrono::steady_clock::now();
if (!end_time) {
end_time = std::chrono::steady_clock::now() + timeout;
}
if (now > *end_time) {
callback(std::make_exception_ptr(retry_operation_timeout("timed out")));
if (retries++ >= max_retries) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retries exhausted")));
return;
}
auto delay = std::chrono::duration_cast<std::chrono::microseconds>(initial_delay * (jitter() * pow(2, retries++)));
auto delay = std::chrono::duration_cast<std::chrono::microseconds>(initial_delay * (jitter() * pow(2, retries - 1)));
if (delay > max_delay) {
delay = max_delay;
}
if (now + delay > *end_time) {
timer->expires_after(*end_time - now);
} else {
timer->expires_after(delay);
}
timer->expires_after(delay);
timer->async_wait([callback = std::move(callback)](std::error_code ec) mutable {
if (ec == asio::error::operation_aborted) {
callback(std::make_exception_ptr(retry_operation_timeout("retry aborted")));
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retry aborted")));
return;
}
callback({});
Expand Down Expand Up @@ -379,7 +376,8 @@ struct async_constant_delay {
void operator()(utils::movable_function<void(std::exception_ptr)> callback)
{
if (retries++ >= max_retries) {
throw retry_operation_retries_exhausted("retries exhausted");
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retries exhausted")));
return;
}
timer->expires_after(delay);
timer->async_wait([callback = std::move(callback)](std::error_code ec) mutable {
Expand Down
8 changes: 2 additions & 6 deletions core/transactions/staged_mutation.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ bool
unstaging_state::wait_until_unstage_possible()
{
std::unique_lock lock(mutex_);
auto timeout = std::max(std::chrono::nanoseconds(0), ctx_->overall().remaining()) + timeout_defaults::key_value_durable_timeout +
std::chrono::seconds(1);
auto success = cv_.wait_for(lock, timeout, [this] { return (in_flight_count_ < MAX_PARALLELISM) || abort_; });
auto success = cv_.wait_for(lock, ctx_->overall().remaining(), [this] { return (in_flight_count_ < MAX_PARALLELISM) || abort_; });
if (!abort_) {
if (success) {
in_flight_count_++;
Expand Down Expand Up @@ -281,9 +279,7 @@ staged_mutation_queue::rollback(attempt_context_impl* ctx)
futures.push_back(barrier->get_future());

auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref()->io_context());
auto timeout = std::max(std::chrono::nanoseconds(0), ctx->overall().remaining()) + timeout_defaults::key_value_durable_timeout +
std::chrono::seconds(1);
async_exp_delay delay(timer, std::chrono::milliseconds(1), std::chrono::milliseconds(100), timeout);
async_exp_delay delay(timer);

switch (item.type()) {
case staged_mutation_type::INSERT:
Expand Down

0 comments on commit de48409

Please sign in to comment.