Skip to content

Commit

Permalink
CXXCBC-546: get replica in transaction should not abort it (#614)
Browse files Browse the repository at this point in the history
  • Loading branch information
avsej committed Jun 26, 2024
1 parent 351d266 commit 0a172fa
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 21 deletions.
5 changes: 3 additions & 2 deletions core/transactions/async_attempt_context.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ public:
* @see network_options::preferred_server_group
* @see https://docs.couchbase.com/server/current/manage/manage-groups/manage-groups.html
*/
virtual void get_replica_from_preferred_server_group(const core::document_id& id,
Callback&& cb) = 0;
virtual void get_replica_from_preferred_server_group(
const core::document_id& id,
std::function<void(std::exception_ptr, std::optional<transaction_get_result>)>&& cb) = 0;

/**
* Mutates the specified document with new content, using the document's last
Expand Down
2 changes: 1 addition & 1 deletion core/transactions/attempt_context.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public:
* @see https://docs.couchbase.com/server/current/manage/manage-groups/manage-groups.html
*/
virtual auto get_replica_from_preferred_server_group(const core::document_id& id)
-> transaction_get_result = 0;
-> std::optional<transaction_get_result> = 0;

/**
* Mutates the specified document with new content, using the document's last
Expand Down
46 changes: 30 additions & 16 deletions core/transactions/attempt_context_impl.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "core/cluster.hxx"
#include "core/impl/error.hxx"
#include "core/operations/document_mutate_in.hxx"
#include "core/transactions/error_class.hxx"
#include "durability_level.hxx"
#include "exceptions.hxx"
#include "forward_compat.hxx"
Expand Down Expand Up @@ -405,11 +406,8 @@ attempt_context_impl::get_replica_from_preferred_server_group(
std::move(cb),
transaction_operation_failed(*ec, "transaction expired during get").expired());
case FAIL_DOC_NOT_FOUND:
return self->op_completed_with_error(
std::move(cb),
transaction_operation_failed(
*ec, fmt::format("document not found {}", err_message.value_or("")))
.cause(external_exception::DOCUMENT_NOT_FOUND_EXCEPTION));
return self->op_completed_with_callback(std::move(cb),
std::optional<transaction_get_result>());
case FAIL_TRANSIENT:
return self->op_completed_with_error(
std::move(cb),
Expand All @@ -422,6 +420,12 @@ attempt_context_impl::get_replica_from_preferred_server_group(
transaction_operation_failed(
*ec, fmt::format("fail hard in get {}", err_message.value_or("")))
.no_rollback());
case FAIL_OTHER:
if (err_message.value_or("") == "document_irretrievable (102)") {
return self->op_completed_with_callback(std::move(cb),
std::optional<transaction_get_result>());
}
// fall through
default: {
auto msg = fmt::format("got error \"{}\" (ec={}) while getting replica for doc {}",
err_message.value_or(""),
Expand Down Expand Up @@ -455,17 +459,16 @@ attempt_context_impl::get_replica_from_preferred_server_group(

auto
attempt_context_impl::get_replica_from_preferred_server_group(const core::document_id& id)
-> transaction_get_result
-> std::optional<transaction_get_result>
{
auto barrier = std::make_shared<std::promise<transaction_get_result>>();
auto barrier = std::make_shared<std::promise<std::optional<transaction_get_result>>>();
auto f = barrier->get_future();
get_replica_from_preferred_server_group(
id, [barrier](std::exception_ptr err, std::optional<transaction_get_result> res) {
if (err) {
barrier->set_exception(std::move(err));
} else {
barrier->set_value(*res);
}
return barrier->set_value(std::move(res));
});
return f.get();
}
Expand All @@ -475,10 +478,19 @@ attempt_context_impl::get_replica_from_preferred_server_group(const couchbase::c
const std::string& id)
-> std::pair<couchbase::error, couchbase::transactions::transaction_get_result>
{
return wrap_call_for_public_api([this, coll, id]() mutable -> transaction_get_result {
return get_replica_from_preferred_server_group(
{ coll.bucket_name(), coll.scope_name(), coll.name(), id });
});
auto [ctx, res] = wrap_call_for_public_api(
[self = shared_from_this(), coll, id]() mutable -> transaction_get_result {
auto ret = self->get_replica_from_preferred_server_group(
{ coll.bucket_name(), coll.scope_name(), coll.name(), id });
if (ret) {
return ret.value();
}
return {};
});
if (!ctx.ec() && res.cas().empty()) {
return { { errc::transaction_op::document_not_found }, res };
}
return { ctx, res };
}

void
Expand Down Expand Up @@ -2917,6 +2929,7 @@ attempt_context_impl::do_get(const core::document_id& id,
allow_replica,
[self,
id,
allow_replica,
resolving_missing_atr_entry = std::move(resolving_missing_atr_entry),
cb = std::move(cb)](std::optional<error_class> ec,
std::optional<std::string> err_message,
Expand Down Expand Up @@ -2952,7 +2965,7 @@ attempt_context_impl::do_get(const core::document_id& id,
active_transaction_record::get_atr(
self->cluster_ref(),
doc_atr_id,
[self, id, doc, cb = std::move(cb)](
[self, id, allow_replica, doc, cb = std::move(cb)](
std::error_code ec2, std::optional<active_transaction_record> atr) mutable {
if (!ec2 && atr) {
active_transaction_record& atr_doc = atr.value();
Expand Down Expand Up @@ -3000,7 +3013,8 @@ attempt_context_impl::do_get(const core::document_id& id,
CB_ATTEMPT_CTX_LOG_DEBUG(self,
"could not get ATR entry, checking again with {}",
doc->links().staged_attempt_id().value_or("-"));
return self->do_get(id, false, doc->links().staged_attempt_id(), cb);
return self->do_get(
id, allow_replica, doc->links().staged_attempt_id(), cb);
}
if (ignore_doc) {
return cb(std::nullopt, std::nullopt, std::nullopt);
Expand All @@ -3013,7 +3027,7 @@ attempt_context_impl::do_get(const core::document_id& id,
CB_ATTEMPT_CTX_LOG_DEBUG(self,
"could not get ATR, checking again with {}",
doc->links().staged_attempt_id().value_or("-"));
return self->do_get(id, false, doc->links().staged_attempt_id(), cb);
return self->do_get(id, allow_replica, doc->links().staged_attempt_id(), cb);
});
} else {
if (doc->links().is_deleted()) {
Expand Down
6 changes: 4 additions & 2 deletions core/transactions/attempt_context_impl.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,10 @@ public:
std::function<void(std::exception_ptr, std::optional<transaction_get_result>)>&& cb) override;

auto get_replica_from_preferred_server_group(const core::document_id& id)
-> transaction_get_result override;
void get_replica_from_preferred_server_group(const core::document_id& id, Callback&& cb) override;
-> std::optional<transaction_get_result> override;
void get_replica_from_preferred_server_group(
const core::document_id& id,
std::function<void(std::exception_ptr, std::optional<transaction_get_result>)>&& cb) override;
auto get_replica_from_preferred_server_group(const couchbase::collection& coll,
const std::string& id)
-> std::pair<couchbase::error, couchbase::transactions::transaction_get_result> override;
Expand Down

0 comments on commit 0a172fa

Please sign in to comment.