Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CXXCBC-377: Implement ExtParallelUnstaging in transactions #457

Merged
merged 6 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/meta/features.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@
* couchbase::collection::query_indexes() support
*/
#define COUCHBASE_CXX_CLIENT_HAS_COLLECTION_QUERY_INDEX_MANAGEMENT 1

#define COUCHBASE_CXX_CLIENT_TRANSACTIONS_EXT_PARALLEL_UNSTAGING
5 changes: 5 additions & 0 deletions core/transactions/attempt_context_impl.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ class attempt_context_impl
return is_done_;
}

[[nodiscard]] transaction_context& overall()
{
return overall_;
}

[[nodiscard]] const std::string& transaction_id()
{
return overall_.transaction_id();
Expand Down
2 changes: 1 addition & 1 deletion core/transactions/forward_compat.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ struct forward_compat_supported {
uint32_t protocol_major = 2;
uint32_t protocol_minor = 0;
std::list<std::string> extensions{ "TI", "MO", "BM", "QU", "SD", "BF3787", "BF3705", "BF3838", "RC",
"UA", "CO", "BF3791", "CM", "SI", "QC", "IX", "TS" };
"UA", "CO", "BF3791", "CM", "SI", "QC", "IX", "TS", "PU" };
};

struct forward_compat_requirement {
Expand Down
91 changes: 91 additions & 0 deletions core/transactions/internal/utils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
#include <string>
#include <thread>

#include <asio/steady_timer.hpp>
#include <utility>

namespace couchbase::core::transactions
{
// returns the parsed server time from the result of a lookup_in_spec::get(subdoc::lookup_in_macro::vbucket).xattr() call
Expand Down Expand Up @@ -82,6 +85,9 @@ wrap_durable_request(T&& req, const couchbase::transactions::transactions_config
return req;
}

void
validate_operation_result(result& res, bool ignore_subdoc_errors = true);

result
wrap_operation_future(std::future<result>& fut, bool ignore_subdoc_errors = true);

Expand Down Expand Up @@ -151,6 +157,7 @@ error_class_from_response(const Resp& resp)

static constexpr std::chrono::milliseconds DEFAULT_RETRY_OP_DELAY{ 3 };
static constexpr std::chrono::milliseconds DEFAULT_RETRY_OP_EXP_DELAY{ 1 };
static constexpr std::chrono::milliseconds DEFAULT_RETRY_OP_MAX_EXP_DELAY{ 100 };
static constexpr std::size_t DEFAULT_RETRY_OP_MAX_RETRIES{ 100 };
static constexpr double RETRY_OP_JITTER{ 0.1 }; // means +/- 10% for jitter.
static constexpr std::size_t DEFAULT_RETRY_OP_EXPONENT_CAP{ 8 };
Expand Down Expand Up @@ -300,6 +307,90 @@ struct constant_delay {
}
};

struct async_exp_delay {
std::shared_ptr<asio::steady_timer> timer;
std::chrono::microseconds initial_delay;
std::chrono::microseconds max_delay;
std::size_t max_retries;
mutable std::size_t retries;

template<typename R1, typename P1, typename R2, typename P2>
async_exp_delay(std::shared_ptr<asio::steady_timer> timer,
std::chrono::duration<R1, P1> initial,
std::chrono::duration<R2, P2> max,
std::size_t max_retries)
: timer(std::move(timer))
, initial_delay(std::chrono::duration_cast<std::chrono::microseconds>(initial))
, max_delay(std::chrono::duration_cast<std::chrono::microseconds>(max))
, max_retries(max_retries)
, retries(0)
{
}

async_exp_delay(std::shared_ptr<asio::steady_timer> timer)
: async_exp_delay(std::move(timer), DEFAULT_RETRY_OP_EXP_DELAY, DEFAULT_RETRY_OP_MAX_EXP_DELAY, DEFAULT_RETRY_OP_MAX_RETRIES)
{
}

void operator()(utils::movable_function<void(std::exception_ptr)> callback) const
{
if (retries >= max_retries) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retries exhausted")));
return;
}
auto delay =
std::chrono::duration_cast<std::chrono::microseconds>(initial_delay * (jitter() * pow(2, static_cast<double>(retries++))));
if (delay > max_delay) {
delay = max_delay;
}
timer->expires_after(delay);
timer->async_wait([callback = std::move(callback)](std::error_code ec) mutable {
if (ec == asio::error::operation_aborted) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retry aborted")));
return;
}
avsej marked this conversation as resolved.
Show resolved Hide resolved
callback({});
});
}
};

struct async_constant_delay {
std::shared_ptr<asio::steady_timer> timer;
std::chrono::microseconds delay;
std::size_t max_retries;
std::size_t retries;

template<typename R, typename P>
async_constant_delay(std::shared_ptr<asio::steady_timer> timer, std::chrono::duration<R, P> d, std::size_t max)
: timer(std::move(timer))
, delay(std::chrono::duration_cast<std::chrono::microseconds>(d))
, max_retries(max)
, retries(0)
{
}

explicit async_constant_delay(std::shared_ptr<asio::steady_timer> timer)
: async_constant_delay(std::move(timer), DEFAULT_RETRY_OP_DELAY, DEFAULT_RETRY_OP_MAX_RETRIES)
{
}

void operator()(utils::movable_function<void(std::exception_ptr)> callback)
{
if (retries++ >= max_retries) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retries exhausted")));
return;
}
timer->expires_after(delay);
timer->async_wait([callback = std::move(callback)](std::error_code ec) mutable {
if (ec == asio::error::operation_aborted) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retry aborted")));
return;
}
callback({});
});
}
};

std::list<std::string>
get_and_open_buckets(std::shared_ptr<core::cluster> c);

Expand Down