Skip to content

Commit

Permalink
Wrap code that launches async commit/rollback operations in try-catch…
Browse files Browse the repository at this point in the history
… block & small fixes
  • Loading branch information
DemetrisChr committed Oct 18, 2023
1 parent de48409 commit bdbe1a3
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 58 deletions.
5 changes: 3 additions & 2 deletions core/transactions/internal/utils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,12 @@ struct async_exp_delay {

void operator()(utils::movable_function<void(std::exception_ptr)> callback) const
{
if (retries++ >= max_retries) {
if (retries >= max_retries) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retries exhausted")));
return;
}
auto delay = std::chrono::duration_cast<std::chrono::microseconds>(initial_delay * (jitter() * pow(2, retries - 1)));
auto delay =
std::chrono::duration_cast<std::chrono::microseconds>(initial_delay * (jitter() * pow(2, static_cast<double>(retries++))));
if (delay > max_delay) {
delay = max_delay;
}
Expand Down
137 changes: 81 additions & 56 deletions core/transactions/staged_mutation.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ unstaging_state::notify_unstage_error()
{
std::lock_guard lock(mutex_);
abort_ = true;
in_flight_count_--;
cv_.notify_all();
}

Expand Down Expand Up @@ -205,36 +206,48 @@ staged_mutation_queue::commit(attempt_context_impl* ctx)
}

auto barrier = std::make_shared<std::promise<void>>();
futures.push_back(barrier->get_future());

auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref()->io_context());
async_constant_delay delay(timer);
auto future = barrier->get_future();

switch (item.type()) {
case staged_mutation_type::REMOVE:
remove_doc(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
case staged_mutation_type::INSERT:
case staged_mutation_type::REPLACE:
commit_doc(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
try {
auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref()->io_context());
async_constant_delay delay(timer);

switch (item.type()) {
case staged_mutation_type::REMOVE:
remove_doc(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
case staged_mutation_type::INSERT:
case staged_mutation_type::REPLACE:
commit_doc(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
}
} catch (...) {
// This should not happen, but catching it to ensure that we wait for in-flight operations
CB_ATTEMPT_CTX_LOG_ERROR(ctx,
"caught exception while trying to initiate commit for {}. Aborting rest of commit and waiting for "
"in-flight rollback operations to finish",
item.doc().id());
aborted = true;
break;
}

futures.push_back(std::move(future));
}

std::exception_ptr exc{};
Expand Down Expand Up @@ -276,36 +289,48 @@ staged_mutation_queue::rollback(attempt_context_impl* ctx)
}

auto barrier = std::make_shared<std::promise<void>>();
futures.push_back(barrier->get_future());
auto future = barrier->get_future();

auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref()->io_context());
async_exp_delay delay(timer);

switch (item.type()) {
case staged_mutation_type::INSERT:
rollback_insert(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
case staged_mutation_type::REMOVE:
case staged_mutation_type::REPLACE:
rollback_remove_or_replace(ctx, item, delay, [&state, barrier](std::exception_ptr exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
try {
auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref()->io_context());
async_exp_delay delay(timer);

switch (item.type()) {
case staged_mutation_type::INSERT:
rollback_insert(ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
case staged_mutation_type::REMOVE:
case staged_mutation_type::REPLACE:
rollback_remove_or_replace(ctx, item, delay, [&state, barrier](std::exception_ptr exc) {
if (exc) {
state.notify_unstage_error();
barrier->set_exception(exc);
} else {
state.notify_unstage_complete();
barrier->set_value();
}
});
break;
}
} catch (...) {
// This should not happen, but catching it to ensure that we wait for in-flight operations
CB_ATTEMPT_CTX_LOG_ERROR(ctx,
"caught exception while trying to initiate rollback for {}. Aborting rollback and waiting for "
"in-flight rollback operations to finish",
item.doc().id());
aborted = true;
break;
}

futures.push_back(std::move(future));
}

std::exception_ptr exc{};
Expand Down

0 comments on commit bdbe1a3

Please sign in to comment.