Skip to content

Commit

Permalink
CXXCBC-377: Implement ExtParallelUnstaging in transactions (#457)
Browse files Browse the repository at this point in the history
  • Loading branch information
DemetrisChr committed Oct 24, 2023
1 parent eb4fe60 commit 1658962
Show file tree
Hide file tree
Showing 8 changed files with 723 additions and 200 deletions.
2 changes: 2 additions & 0 deletions core/meta/features.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@
* couchbase::collection::query_indexes() support
*/
#define COUCHBASE_CXX_CLIENT_HAS_COLLECTION_QUERY_INDEX_MANAGEMENT 1

#define COUCHBASE_CXX_CLIENT_TRANSACTIONS_EXT_PARALLEL_UNSTAGING
5 changes: 5 additions & 0 deletions core/transactions/attempt_context_impl.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ class attempt_context_impl
return is_done_;
}

[[nodiscard]] transaction_context& overall()
{
return overall_;
}

[[nodiscard]] const std::string& transaction_id()
{
return overall_.transaction_id();
Expand Down
2 changes: 1 addition & 1 deletion core/transactions/forward_compat.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ struct forward_compat_supported {
uint32_t protocol_major = 2;
uint32_t protocol_minor = 0;
std::list<std::string> extensions{ "TI", "MO", "BM", "QU", "SD", "BF3787", "BF3705", "BF3838", "RC",
"UA", "CO", "BF3791", "CM", "SI", "QC", "IX", "TS" };
"UA", "CO", "BF3791", "CM", "SI", "QC", "IX", "TS", "PU" };
};

struct forward_compat_requirement {
Expand Down
91 changes: 91 additions & 0 deletions core/transactions/internal/utils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
#include <string>
#include <thread>

#include <asio/steady_timer.hpp>
#include <utility>

namespace couchbase::core::transactions
{
// returns the parsed server time from the result of a lookup_in_spec::get(subdoc::lookup_in_macro::vbucket).xattr() call
Expand Down Expand Up @@ -82,6 +85,9 @@ wrap_durable_request(T&& req, const couchbase::transactions::transactions_config
return req;
}

void
validate_operation_result(result& res, bool ignore_subdoc_errors = true);

result
wrap_operation_future(std::future<result>& fut, bool ignore_subdoc_errors = true);

Expand Down Expand Up @@ -151,6 +157,7 @@ error_class_from_response(const Resp& resp)

static constexpr std::chrono::milliseconds DEFAULT_RETRY_OP_DELAY{ 3 };
static constexpr std::chrono::milliseconds DEFAULT_RETRY_OP_EXP_DELAY{ 1 };
static constexpr std::chrono::milliseconds DEFAULT_RETRY_OP_MAX_EXP_DELAY{ 100 };
static constexpr std::size_t DEFAULT_RETRY_OP_MAX_RETRIES{ 100 };
static constexpr double RETRY_OP_JITTER{ 0.1 }; // means +/- 10% for jitter.
static constexpr std::size_t DEFAULT_RETRY_OP_EXPONENT_CAP{ 8 };
Expand Down Expand Up @@ -300,6 +307,90 @@ struct constant_delay {
}
};

struct async_exp_delay {
std::shared_ptr<asio::steady_timer> timer;
std::chrono::microseconds initial_delay;
std::chrono::microseconds max_delay;
std::size_t max_retries;
mutable std::size_t retries;

template<typename R1, typename P1, typename R2, typename P2>
async_exp_delay(std::shared_ptr<asio::steady_timer> timer,
std::chrono::duration<R1, P1> initial,
std::chrono::duration<R2, P2> max,
std::size_t max_retries)
: timer(std::move(timer))
, initial_delay(std::chrono::duration_cast<std::chrono::microseconds>(initial))
, max_delay(std::chrono::duration_cast<std::chrono::microseconds>(max))
, max_retries(max_retries)
, retries(0)
{
}

async_exp_delay(std::shared_ptr<asio::steady_timer> timer)
: async_exp_delay(std::move(timer), DEFAULT_RETRY_OP_EXP_DELAY, DEFAULT_RETRY_OP_MAX_EXP_DELAY, DEFAULT_RETRY_OP_MAX_RETRIES)
{
}

void operator()(utils::movable_function<void(std::exception_ptr)> callback) const
{
if (retries >= max_retries) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retries exhausted")));
return;
}
auto delay =
std::chrono::duration_cast<std::chrono::microseconds>(initial_delay * (jitter() * pow(2, static_cast<double>(retries++))));
if (delay > max_delay) {
delay = max_delay;
}
timer->expires_after(delay);
timer->async_wait([callback = std::move(callback)](std::error_code ec) mutable {
if (ec == asio::error::operation_aborted) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retry aborted")));
return;
}
callback({});
});
}
};

struct async_constant_delay {
std::shared_ptr<asio::steady_timer> timer;
std::chrono::microseconds delay;
std::size_t max_retries;
std::size_t retries;

template<typename R, typename P>
async_constant_delay(std::shared_ptr<asio::steady_timer> timer, std::chrono::duration<R, P> d, std::size_t max)
: timer(std::move(timer))
, delay(std::chrono::duration_cast<std::chrono::microseconds>(d))
, max_retries(max)
, retries(0)
{
}

explicit async_constant_delay(std::shared_ptr<asio::steady_timer> timer)
: async_constant_delay(std::move(timer), DEFAULT_RETRY_OP_DELAY, DEFAULT_RETRY_OP_MAX_RETRIES)
{
}

void operator()(utils::movable_function<void(std::exception_ptr)> callback)
{
if (retries++ >= max_retries) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retries exhausted")));
return;
}
timer->expires_after(delay);
timer->async_wait([callback = std::move(callback)](std::error_code ec) mutable {
if (ec == asio::error::operation_aborted) {
callback(std::make_exception_ptr(retry_operation_retries_exhausted("retry aborted")));
return;
}
callback({});
});
}
};

std::list<std::string>
get_and_open_buckets(std::shared_ptr<core::cluster> c);

Expand Down

0 comments on commit 1658962

Please sign in to comment.