Skip to content

Commit

Permalink
Wrap code that launches async commit/rollback operations in try-catch…
Browse files Browse the repository at this point in the history
… block & small fixes
  • Loading branch information
DemetrisChr authored and avsej committed Oct 24, 2023
1 parent f949c20 commit aee1504
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 66 deletions.
5 changes: 3 additions & 2 deletions core/transactions/internal/utils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,12 @@ struct async_exp_delay {

void operator()(utils::movable_function<void(std::exception_ptr)> callback) const
{
if (retries++ >= max_retries) {
if (retries >= max_retries) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retries exhausted")));
return;
}
auto delay = std::chrono::duration_cast<std::chrono::microseconds>(initial_delay * (jitter() * pow(2, retries - 1)));
auto delay =
std::chrono::duration_cast<std::chrono::microseconds>(initial_delay * (jitter() * pow(2, static_cast<double>(retries++))));
if (delay > max_delay) {
delay = max_delay;
}
Expand Down
153 changes: 89 additions & 64 deletions core/transactions/staged_mutation.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ unstaging_state::notify_unstage_error()
{
std::lock_guard lock(mutex_);
abort_ = true;
in_flight_count_--;
cv_.notify_all();
}

Expand Down Expand Up @@ -205,36 +206,48 @@ staged_mutation_queue::commit(attempt_context_impl* ctx)
}

auto barrier = std::make_shared<std::promise<void>>();
futures.push_back(barrier->get_future());

auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref()->io_context());
async_constant_delay delay(timer);
auto future = barrier->get_future();

switch (item.type()) {
case staged_mutation_type::REMOVE:
remove_doc(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
case staged_mutation_type::INSERT:
case staged_mutation_type::REPLACE:
commit_doc(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
try {
auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref()->io_context());
async_constant_delay delay(timer);

switch (item.type()) {
case staged_mutation_type::REMOVE:
remove_doc(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
case staged_mutation_type::INSERT:
case staged_mutation_type::REPLACE:
commit_doc(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
}
} catch (...) {
// This should not happen, but catching it to ensure that we wait for in-flight operations
CB_ATTEMPT_CTX_LOG_ERROR(ctx,
"caught exception while trying to initiate commit for {}. Aborting rest of commit and waiting for "
"in-flight rollback operations to finish",
item.doc().id());
aborted = true;
break;
}

futures.push_back(std::move(future));
}

std::exception_ptr exc{};
Expand Down Expand Up @@ -276,36 +289,48 @@ staged_mutation_queue::rollback(attempt_context_impl* ctx)
}

auto barrier = std::make_shared<std::promise<void>>();
futures.push_back(barrier->get_future());
auto future = barrier->get_future();

auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref()->io_context());
async_exp_delay delay(timer);

switch (item.type()) {
case staged_mutation_type::INSERT:
rollback_insert(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
case staged_mutation_type::REMOVE:
case staged_mutation_type::REPLACE:
rollback_remove_or_replace(ctx, item, delay, [&state, barrier](std::exception_ptr exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
try {
auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref()->io_context());
async_exp_delay delay(timer);

switch (item.type()) {
case staged_mutation_type::INSERT:
rollback_insert(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
case staged_mutation_type::REMOVE:
case staged_mutation_type::REPLACE:
rollback_remove_or_replace(ctx, item, delay, [&state, barrier](std::exception_ptr exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
}
} catch (...) {
// This should not happen, but catching it to ensure that we wait for in-flight operations
CB_ATTEMPT_CTX_LOG_ERROR(ctx,
"caught exception while trying to initiate rollback for {}. Aborting rollback and waiting for "
"in-flight rollback operations to finish",
item.doc().id());
aborted = true;
break;
}

futures.push_back(std::move(future));
}

std::exception_ptr exc{};
Expand Down Expand Up @@ -610,7 +635,7 @@ staged_mutation_queue::handle_commit_doc_error(const client_error& e,
default:
throw transaction_operation_failed(ec, e.what()).no_rollback().failed_post_commit();
}
} catch (const retry_operation& e) {
} catch (const retry_operation&) {
delay([this, callback = std::move(callback), ctx, &item, delay, ambiguity_resolution_mode, cas_zero_mode](
const std::exception_ptr& exc) mutable {
if (exc) {
Expand All @@ -620,7 +645,7 @@ staged_mutation_queue::handle_commit_doc_error(const client_error& e,
CB_ATTEMPT_CTX_LOG_TRACE(ctx, "retrying commit_doc");
commit_doc(ctx, item, delay, std::move(callback), ambiguity_resolution_mode, cas_zero_mode);
});
} catch (const transaction_operation_failed& e) {
} catch (const transaction_operation_failed&) {
callback(std::current_exception());
}
}
Expand All @@ -645,7 +670,7 @@ staged_mutation_queue::handle_remove_doc_error(const client_error& e,
default:
throw transaction_operation_failed(ec, e.what()).no_rollback().failed_post_commit();
}
} catch (const retry_operation& e) {
} catch (const retry_operation&) {
delay([this, callback = std::move(callback), ctx, &item, delay](const std::exception_ptr& exc) mutable {
if (exc) {
callback(exc);
Expand All @@ -654,7 +679,7 @@ staged_mutation_queue::handle_remove_doc_error(const client_error& e,
CB_ATTEMPT_CTX_LOG_TRACE(ctx, "retrying remove_doc");
remove_doc(ctx, item, delay, std::move(callback));
});
} catch (const transaction_operation_failed& e) {
} catch (const transaction_operation_failed&) {
callback(std::current_exception());
}
}
Expand Down Expand Up @@ -691,7 +716,7 @@ staged_mutation_queue::handle_rollback_insert_error(const client_error& e,
default:
throw retry_operation("retry rollback insert");
}
} catch (const retry_operation& e) {
} catch (const retry_operation&) {
delay([this, callback = std::move(callback), ctx, &item, delay](const std::exception_ptr& exc) mutable {
if (exc) {
callback(exc);
Expand All @@ -700,7 +725,7 @@ staged_mutation_queue::handle_rollback_insert_error(const client_error& e,
CB_ATTEMPT_CTX_LOG_TRACE(ctx, "retrying rollback_insert");
rollback_insert(ctx, item, delay, std::move(callback));
});
} catch (const transaction_operation_failed& e) {
} catch (const transaction_operation_failed&) {
callback(std::current_exception());
}
}
Expand Down Expand Up @@ -736,7 +761,7 @@ staged_mutation_queue::handle_rollback_remove_or_replace_error(const client_erro
default:
throw retry_operation("retry rollback_remove_or_replace");
}
} catch (const retry_operation& e) {
} catch (const retry_operation&) {
delay([this, callback = std::move(callback), ctx, &item, delay](const std::exception_ptr& exc) mutable {
if (exc) {
callback(exc);
Expand All @@ -745,7 +770,7 @@ staged_mutation_queue::handle_rollback_remove_or_replace_error(const client_erro
CB_ATTEMPT_CTX_LOG_TRACE(ctx, "retrying rollback_remove_or_replace");
rollback_remove_or_replace(ctx, item, delay, std::move(callback));
});
} catch (const transaction_operation_failed& e) {
} catch (const transaction_operation_failed&) {
callback(std::current_exception());
}
}
Expand Down

0 comments on commit aee1504

Please sign in to comment.