Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CXXCBC-391: Fix transactions API bugs #482

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/meta/features.hxx
Expand Up @@ -79,6 +79,12 @@
*/
#define COUCHBASE_CXX_CLIENT_HAS_CORE_CLUSTER_HIDDEN 1

/**
* expiration_time has been renamed to timeout in transactions_options and transactions_config
* kv_timeout removed from transactions_options and transactions_config
*/
#define COUCHBASE_CXX_CLIENT_TRANSACTIONS_OPTIONS_HAVE_TIMEOUT 1

/**
* Search index management is accessible from the public API
* couchbase::cluster::search_indexes() support
Expand Down
5 changes: 1 addition & 4 deletions core/origin.cxx
Expand Up @@ -168,7 +168,7 @@ struct traits<couchbase::transactions::transactions_config::built> {
static void assign(tao::json::basic_value<Traits>& v, const couchbase::transactions::transactions_config::built& o)
{
v = {
{ "expiration_time", o.expiration_time },
{ "timeout", o.timeout },
{ "durability_level", o.level },
{
"query_config",
Expand All @@ -186,9 +186,6 @@ struct traits<couchbase::transactions::transactions_config::built> {
},
},
};
if (const auto& p = o.kv_timeout; p.has_value()) {
v["key_value_timeout"] = p.value();
}
if (const auto& p = o.metadata_collection; p.has_value()) {
v["metadata_collection"] = {
{ "bucket", p.value().bucket },
Expand Down
1 change: 0 additions & 1 deletion core/transactions/active_transaction_record.hxx
Expand Up @@ -35,7 +35,6 @@ namespace couchbase::core::transactions
class active_transaction_record
{
public:
// TODO: we should get the kv_timeout and put it in the request (pass in the transactions_config)
template<typename Callback>
static void get_atr(const core::cluster& cluster, const core::document_id& atr_id, Callback&& cb)
{
Expand Down
20 changes: 9 additions & 11 deletions core/transactions/atr_cleanup_entry.cxx
Expand Up @@ -191,7 +191,6 @@ atr_cleanup_entry::do_per_doc(std::vector<doc_record> docs,
}
.specs();
req.access_deleted = true;
wrap_request(req, cleanup_->config());
// now a blocking lookup_in...
auto barrier = std::make_shared<std::promise<result>>();
cleanup_->cluster_ref().execute(
Expand Down Expand Up @@ -255,10 +254,9 @@ atr_cleanup_entry::commit_docs(std::optional<std::vector<doc_record>> docs, dura
core::operations::insert_request req{ doc.id(), content };
auto barrier = std::make_shared<std::promise<result>>();
auto f = barrier->get_future();
cleanup_->cluster_ref().execute(wrap_durable_request(req, cleanup_->config(), dl),
[barrier](core::operations::insert_response resp) {
barrier->set_value(result::create_from_mutation_response(resp));
});
cleanup_->cluster_ref().execute(wrap_durable_request(req, dl), [barrier](core::operations::insert_response resp) {
barrier->set_value(result::create_from_mutation_response(resp));
});
wrap_operation_future(f);
} else {
core::operations::mutate_in_request req{ doc.id() };
Expand All @@ -270,7 +268,7 @@ atr_cleanup_entry::commit_docs(std::optional<std::vector<doc_record>> docs, dura
.specs();
req.cas = doc.cas();
req.store_semantics = couchbase::store_semantics::replace;
wrap_durable_request(req, cleanup_->config(), dl);
wrap_durable_request(req, dl);
auto barrier = std::make_shared<std::promise<result>>();
auto f = barrier->get_future();
cleanup_->cluster_ref().execute(req, [barrier](core::operations::mutate_in_response resp) {
Expand Down Expand Up @@ -303,7 +301,7 @@ atr_cleanup_entry::remove_docs(std::optional<std::vector<doc_record>> docs, dura
.specs();
req.cas = doc.cas();
req.access_deleted = true;
wrap_durable_request(req, cleanup_->config(), dl);
wrap_durable_request(req, dl);
auto barrier = std::make_shared<std::promise<result>>();
auto f = barrier->get_future();
cleanup_->cluster_ref().execute(req, [barrier](core::operations::mutate_in_response resp) {
Expand All @@ -313,7 +311,7 @@ atr_cleanup_entry::remove_docs(std::optional<std::vector<doc_record>> docs, dura
} else {
core::operations::remove_request req{ doc.id() };
req.cas = doc.cas();
wrap_durable_request(req, cleanup_->config(), dl);
wrap_durable_request(req, dl);
auto barrier = std::make_shared<std::promise<result>>();
auto f = barrier->get_future();
cleanup_->cluster_ref().execute(req, [barrier](core::operations::remove_response resp) {
Expand All @@ -338,7 +336,7 @@ atr_cleanup_entry::remove_docs_staged_for_removal(std::optional<std::vector<doc_
}
core::operations::remove_request req{ doc.id() };
req.cas = doc.cas();
wrap_durable_request(req, cleanup_->config(), dl);
wrap_durable_request(req, dl);
auto barrier = std::make_shared<std::promise<result>>();
auto f = barrier->get_future();
cleanup_->cluster_ref().execute(req, [barrier](core::operations::remove_response resp) {
Expand Down Expand Up @@ -372,7 +370,7 @@ atr_cleanup_entry::remove_txn_links(std::optional<std::vector<doc_record>> docs,
.specs();
req.access_deleted = true;
req.cas = doc.cas();
wrap_durable_request(req, cleanup_->config(), dl);
wrap_durable_request(req, dl);
auto barrier = std::make_shared<std::promise<result>>();
auto f = barrier->get_future();
cleanup_->cluster_ref().execute(
Expand All @@ -399,7 +397,7 @@ atr_cleanup_entry::cleanup_entry(durability_level dl)
}
mut_specs.push_back(couchbase::mutate_in_specs::remove("attempts." + atr_entry_->attempt_id()).xattr());
req.specs = mut_specs.specs();
wrap_durable_request(req, cleanup_->config(), dl);
wrap_durable_request(req, dl);
auto barrier = std::make_shared<std::promise<result>>();
auto f = barrier->get_future();
cleanup_->cluster_ref().execute(
Expand Down
16 changes: 7 additions & 9 deletions core/transactions/attempt_context_impl.cxx
Expand Up @@ -340,8 +340,7 @@ attempt_context_impl::replace_raw(const transaction_get_result& document, const
}
if (existing_sm != nullptr && existing_sm->type() == staged_mutation_type::INSERT) {
CB_ATTEMPT_CTX_LOG_DEBUG(this, "found existing INSERT of {} while replacing", document);
exp_delay delay(
std::chrono::milliseconds(5), std::chrono::milliseconds(300), overall_.config().expiration_time);
exp_delay delay(std::chrono::milliseconds(5), std::chrono::milliseconds(300), overall_.config().timeout);
create_staged_insert(document.id(), content, existing_sm->doc().cas().value(), delay, op_id, std::move(cb));
return;
}
Expand Down Expand Up @@ -478,7 +477,7 @@ attempt_context_impl::insert_raw(const core::document_id& id, const std::vector<
return create_staged_replace(existing_sm->doc(), content, op_id, std::move(cb));
}
uint64_t cas = 0;
exp_delay delay(std::chrono::milliseconds(5), std::chrono::milliseconds(300), overall_.config().expiration_time);
exp_delay delay(std::chrono::milliseconds(5), std::chrono::milliseconds(300), overall_.config().timeout);
create_staged_insert(id, content, cas, delay, op_id, std::move(cb));
});
} catch (const std::exception& e) {
Expand Down Expand Up @@ -764,8 +763,9 @@ attempt_context_impl::query_begin_work(std::optional<std::string> query_context,
txdata["state"] = tao::json::empty_object;
txdata["state"]["timeLeftMs"] = overall_.remaining().count() / 1000000;
txdata["config"] = tao::json::empty_object;
auto [ec, origin] = overall_.cluster_ref().origin();
txdata["config"]["kvTimeoutMs"] =
overall_.config().kv_timeout ? overall_.config().kv_timeout->count() : core::timeout_defaults::key_value_timeout.count();
(ec) ? core::timeout_defaults::key_value_durable_timeout.count() : origin.options().key_value_durable_timeout.count();
txdata["config"]["numAtrs"] = 1024;
opts.raw("numatrs", jsonify(1024));
txdata["config"]["durabilityLevel"] = durability_level_to_string(overall_.config().level);
Expand Down Expand Up @@ -1496,7 +1496,6 @@ attempt_context_impl::atr_commit_ambiguity_resolution()
std::string prefix(ATR_FIELD_ATTEMPTS + "." + id() + ".");
core::operations::lookup_in_request req{ atr_id_.value() };
req.specs = lookup_in_specs{ lookup_in_specs::get(prefix + ATR_FIELD_STATUS).xattr() }.specs();
wrap_request(req, overall_.config());
auto barrier = std::make_shared<std::promise<result>>();
auto f = barrier->get_future();
overall_.cluster_ref().execute(
Expand Down Expand Up @@ -1935,10 +1934,10 @@ attempt_context_impl::set_atr_pending_locked(const core::document_id& id, std::u
CB_ATTEMPT_CTX_LOG_DEBUG(this, "updating atr {}", atr_id_.value());

std::chrono::nanoseconds remaining = overall_.remaining();
// This bounds the value to [0-expirationTime]. It should always be in this range, this is just to protect
// This bounds the value to [0-timeout]. It should always be in this range, this is just to protect
// against the application clock changing.
long remaining_bounded_nanos = std::max(std::min(remaining.count(), overall_.config().expiration_time.count()),
static_cast<std::chrono::nanoseconds::rep>(0));
long remaining_bounded_nanos =
std::max(std::min(remaining.count(), overall_.config().timeout.count()), static_cast<std::chrono::nanoseconds::rep>(0));
long remaining_bounded_msecs = remaining_bounded_nanos / 1'000'000;

core::operations::mutate_in_request req{ atr_id_.value() };
Expand Down Expand Up @@ -2176,7 +2175,6 @@ attempt_context_impl::get_doc(
}
.specs();
req.access_deleted = true;
wrap_request(req, overall_.config());
try {
overall_.cluster_ref().execute(req, [this, id, cb = std::move(cb)](core::operations::lookup_in_response resp) {
auto ec = error_class_from_response(resp);
Expand Down
14 changes: 1 addition & 13 deletions core/transactions/internal/utils.hxx
Expand Up @@ -57,30 +57,18 @@ operator<<(OStream& os, const core::document_id& id)
return os;
}

template<typename T>
T&
wrap_request(T&& req, const couchbase::transactions::transactions_config::built& config)
{
if (config.kv_timeout) {
req.timeout = config.kv_timeout.value();
}
return req;
}

template<typename T>
T&
wrap_durable_request(T&& req, const couchbase::transactions::transactions_config::built& config)
{
wrap_request(req, config);
req.durability_level = config.level;
return req;
}

template<typename T>
T&
wrap_durable_request(T&& req, const couchbase::transactions::transactions_config::built& config, durability_level level)
wrap_durable_request(T&& req, durability_level level)
{
wrap_request(req, config);
req.durability_level = level;
return req;
}
Expand Down
8 changes: 4 additions & 4 deletions core/transactions/transaction_context.cxx
Expand Up @@ -32,7 +32,7 @@ transaction_context::transaction_context(transactions& txns, const couchbase::tr
, config_(config.apply(txns.config()))
, deferred_elapsed_(0)
, cleanup_(txns.cleanup())
, delay_(new exp_delay(std::chrono::milliseconds(1), std::chrono::milliseconds(100), 2 * config_.expiration_time))
, delay_(new exp_delay(std::chrono::milliseconds(1), std::chrono::milliseconds(100), 2 * config_.timeout))
{
// add metadata_collection to cleanup, if present
if (config_.metadata_collection) {
Expand All @@ -54,7 +54,7 @@ transaction_context::remaining() const
{
const auto& now = std::chrono::steady_clock::now();
auto expired_nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(now - start_time_client_) + deferred_elapsed_;
return config_.expiration_time - expired_nanos;
return config_.timeout - expired_nanos;
}

[[nodiscard]] bool
Expand All @@ -64,7 +64,7 @@ transaction_context::has_expired_client_side()
const auto& now = std::chrono::steady_clock::now();
auto expired_nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(now - start_time_client_) + deferred_elapsed_;
auto expired_millis = std::chrono::duration_cast<std::chrono::milliseconds>(expired_nanos);
bool is_expired = expired_nanos > config_.expiration_time;
bool is_expired = expired_nanos > config_.timeout;
if (is_expired) {
CB_ATTEMPT_CTX_LOG_INFO(current_attempt_context_,
"has expired client side (now={}ns, start={}ns, deferred_elapsed={}ns, expired={}ns ({}ms), config={}ms)",
Expand All @@ -73,7 +73,7 @@ transaction_context::has_expired_client_side()
deferred_elapsed_.count(),
expired_nanos.count(),
expired_millis.count(),
std::chrono::duration_cast<std::chrono::milliseconds>(config_.expiration_time).count());
std::chrono::duration_cast<std::chrono::milliseconds>(config_.timeout).count());
}
return is_expired;
}
Expand Down
20 changes: 3 additions & 17 deletions core/transactions/transaction_options.cxx
Expand Up @@ -30,8 +30,7 @@ transaction_options::apply(const transactions_config::built& conf) const
query_config.scan_consistency = *scan_consistency_;
}
return { durability_.value_or(conf.level),
expiration_time_.value_or(conf.expiration_time),
kv_timeout_ ? kv_timeout_ : conf.kv_timeout,
timeout_.value_or(conf.timeout),
attempt_context_hooks_ ? attempt_context_hooks_ : conf.attempt_context_hooks,
cleanup_hooks_ ? cleanup_hooks_ : conf.cleanup_hooks,
metadata_collection_ ? metadata_collection_ : conf.metadata_collection,
Expand Down Expand Up @@ -80,23 +79,10 @@ transaction_options::scan_consistency() const
return scan_consistency_;
}

transaction_options&
transaction_options::kv_timeout(std::chrono::milliseconds kv_timeout)
{
kv_timeout_ = kv_timeout;
return *this;
}

std::optional<std::chrono::milliseconds>
transaction_options::kv_timeout()
{
return kv_timeout_;
}

std::optional<std::chrono::nanoseconds>
transaction_options::expiration_time()
transaction_options::timeout()
{
return expiration_time_;
return timeout_;
}

transaction_options&
Expand Down
2 changes: 1 addition & 1 deletion core/transactions/transactions.cxx
Expand Up @@ -77,7 +77,7 @@ wrap_run(transactions& txns, const couchbase::transactions::transaction_options&
while (attempts++ < max_attempts) {
// NOTE: new_attempt_context has the exponential backoff built in. So, after
// the first time it is called, it has a 1ms delay, then 2ms, etc... capped at 100ms
// until (for now) a timeout is reached (2x the expiration_time). Soon, will build in
// until (for now) a timeout is reached (2x the timeout). Soon, will build in
// a max attempts instead. In any case, the timeout occurs in the logic - adding
// a max attempts or timeout is just in case a bug prevents timeout, etc...
overall.new_attempt_context();
Expand Down
1 change: 0 additions & 1 deletion core/transactions/transactions_cleanup.cxx
Expand Up @@ -283,7 +283,6 @@ transactions_cleanup::get_active_clients(const couchbase::transactions::transact
lookup_in_specs::get(subdoc::lookup_in_macro::vbucket).xattr(),
}
.specs();
wrap_request(req, config_);
auto barrier = std::make_shared<std::promise<result>>();
auto f = barrier->get_future();
auto ec = config_.cleanup_hooks->client_record_before_get(keyspace.bucket);
Expand Down
13 changes: 7 additions & 6 deletions core/transactions/transactions_config.cxx
Expand Up @@ -26,7 +26,7 @@ namespace couchbase::transactions

transactions_config::transactions_config()
: level_(couchbase::durability_level::majority)
, expiration_time_(std::chrono::seconds(15))
, timeout_(std::chrono::seconds(15))
, attempt_context_hooks_(new core::transactions::attempt_context_testing_hooks())
, cleanup_hooks_(new core::transactions::cleanup_testing_hooks())
{
Expand All @@ -36,7 +36,7 @@ transactions_config::~transactions_config() = default;

transactions_config::transactions_config(transactions_config&& c) noexcept
: level_(c.level_)
, expiration_time_(c.expiration_time_)
, timeout_(c.timeout_)
, attempt_context_hooks_(c.attempt_context_hooks_)
, cleanup_hooks_(c.cleanup_hooks_)
, metadata_collection_(std::move(c.metadata_collection_))
Expand All @@ -47,7 +47,7 @@ transactions_config::transactions_config(transactions_config&& c) noexcept

transactions_config::transactions_config(const transactions_config& config)
: level_(config.durability_level())
, expiration_time_(config.expiration_time())
, timeout_(config.timeout())
, attempt_context_hooks_(std::make_shared<core::transactions::attempt_context_testing_hooks>(config.attempt_context_hooks()))
, cleanup_hooks_(std::make_shared<core::transactions::cleanup_testing_hooks>(config.cleanup_hooks()))
, metadata_collection_(config.metadata_collection())
Expand All @@ -61,7 +61,7 @@ transactions_config::operator=(const transactions_config& c)
{
if (this != &c) {
level_ = c.level_;
expiration_time_ = c.expiration_time_;
timeout_ = c.timeout_;
attempt_context_hooks_ = c.attempt_context_hooks_;
cleanup_hooks_ = c.cleanup_hooks_;
query_config_ = c.query_config_;
Expand All @@ -74,8 +74,9 @@ transactions_config::operator=(const transactions_config& c)
transactions_config::built
transactions_config::build() const
{
return { level_, expiration_time_, kv_timeout_, attempt_context_hooks_,
cleanup_hooks_, metadata_collection_, query_config_.build(), cleanup_config_.build() };
return {
level_, timeout_, attempt_context_hooks_, cleanup_hooks_, metadata_collection_, query_config_.build(), cleanup_config_.build()
};
}

} // namespace couchbase::transactions