Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http://ci-eventing-rebalance.northscale.in/eventing-07.04.2019-11.37.pass.html

Change-Id: I15a29adda1a6b39e67af317fc1fc0b12f9dd2bfb
  • Loading branch information
jeelanp2003 committed Apr 8, 2019
2 parents 204960b + 2aed781 commit a4d9500
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 83 deletions.
2 changes: 2 additions & 0 deletions libs/include/isolate_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <v8.h>

struct N1QLCodex;
class Curl;
class N1QL;
class V8Worker;
Expand All @@ -38,6 +39,7 @@ struct IsolateData {

static const uint32_t index = 0;

N1QLCodex *n1ql_codex;
N1QL *n1ql_handle;
V8Worker *v8worker;
JsException *js_exception;
Expand Down
58 changes: 54 additions & 4 deletions libs/include/n1ql.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <stack>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <v8.h>
#include <vector>

Expand All @@ -27,6 +28,23 @@
#include "log.h"
#include "transpiler.h"

struct N1QLCodex {
bool IsRetriable(int64_t code) const {
return retriable_errors.find(code) != retriable_errors.end();
}

enum {
auth_failure = 13014,
attempts_failure = 12009,
};

private:
const std::unordered_set<int64_t> retriable_errors{
auth_failure,
attempts_failure,
};
};

// Data type for managing iterators.
struct IterQueryHandler {
std::string metadata;
Expand All @@ -49,8 +67,10 @@ struct QueryHandler {

// Data type for cookie to be used during row callback execution.
struct HandlerCookie {
v8::Isolate *isolate = nullptr;
lcb_N1QLHANDLE handle = nullptr;
bool must_retry{false};
std::string error;
v8::Isolate *isolate{nullptr};
lcb_N1QLHANDLE handle{nullptr};
};

// Pool of lcb instances and routines for pool management.
Expand Down Expand Up @@ -92,6 +112,30 @@ class HashedStack {
std::unordered_map<std::string, QueryHandler *> qmap_;
};

struct ErrorCodesInfo : Info {
ErrorCodesInfo(const Info &info) : Info(info.is_fatal, info.msg) {}
ErrorCodesInfo(bool is_fatal, const std::string &msg) : Info(is_fatal, msg) {}
ErrorCodesInfo(std::vector<int64_t> &errors) : Info(false) {
std::swap(this->errors, errors);
}

std::vector<int64_t> errors;
};

class N1QLErrorExtractor {
public:
explicit N1QLErrorExtractor(v8::Isolate *isolate);
virtual ~N1QLErrorExtractor();

ErrorCodesInfo GetErrorCodes(const char *err_str);

private:
ErrorCodesInfo GetErrorCodes(const v8::Local<v8::Value> &errors_val);

v8::Isolate *isolate_;
v8::Persistent<v8::Context> context_;
};

class N1QL {
public:
N1QL(ConnectionPool *inst_pool, v8::Isolate *isolate)
Expand All @@ -101,13 +145,19 @@ class N1QL {
template <typename> void ExecQuery(QueryHandler &q_handler);

private:
template <typename>
static bool ExecQueryImpl(v8::Isolate *isolate, lcb_t &instance,
lcb_N1QLPARAMS *n1ql_params, std::string &err_out);

// Callback for each row.
template <typename>
static void RowCallback(lcb_t instance, int callback_type,
const lcb_RESPN1QL *resp);
static void HandleRowCallbackFailure(const lcb_RESPN1QL *resp,
const IsolateData *isolate_data);
v8::Isolate *isolate,
HandlerCookie *cookie);
static bool IsStatusSuccess(const char *row);

v8::Isolate *isolate_;
ConnectionPool *inst_pool_;
};
Expand Down Expand Up @@ -139,6 +189,6 @@ ExtractNamedParams(const v8::FunctionCallbackInfo<v8::Value> &args);
// TODO : Currently, this method needs to be implemented by the file that is
// importing this method
// This method will be deprecated soon
void AddLcbException(const IsolateData *isolate_data, const lcb_RESPN1QL *resp);
void AddLcbException(const IsolateData *isolate_data, const int code);

#endif
149 changes: 129 additions & 20 deletions libs/src/n1ql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void N1QL::RowCallback<IterQueryHandler>(lcb_t instance, int callback_type,
free(row_str);
} else {
if (resp->rc != LCB_SUCCESS || !IsStatusSuccess(resp->row)) {
HandleRowCallbackFailure(resp, isolate_data);
HandleRowCallbackFailure(resp, isolate, cookie);
}

q_handler.iter_handler->metadata = resp->row;
Expand Down Expand Up @@ -254,21 +254,41 @@ void N1QL::RowCallback<BlockingQueryHandler>(lcb_t instance, int callback_type,
free(row_str);
} else {
if (resp->rc != LCB_SUCCESS || !IsStatusSuccess(resp->row)) {
HandleRowCallbackFailure(resp, isolate_data);
HandleRowCallbackFailure(resp, isolate, cookie);
}

q_handler.block_handler->metadata = resp->row;
}
}

void N1QL::HandleRowCallbackFailure(const lcb_RESPN1QL *resp,
const IsolateData *isolate_data) {
AddLcbException(isolate_data, resp);
v8::Isolate *isolate,
HandlerCookie *cookie) {
n1ql_op_exception_count++;
const auto isolate_data = UnwrapData(isolate);
auto js_exception = isolate_data->js_exception;
js_exception->ThrowN1QLError(resp->row);
auto codex = isolate_data->n1ql_codex;

if (resp->rc == LCB_AUTH_ERROR) {
cookie->error = resp->row;
N1QLErrorExtractor extractor(isolate);
const auto info = extractor.GetErrorCodes(resp->row);
if (info.is_fatal) {
js_exception->ThrowN1QLError(info.msg);
return;
}

for (const auto code : info.errors) {
if (codex->IsRetriable(code)) {
cookie->must_retry = true;
}
AddLcbException(isolate_data, static_cast<int>(code));
}

if (!cookie->must_retry) {
js_exception->ThrowN1QLError(resp->row);
}

if (resp->rc == LCB_AUTH_ERROR || cookie->must_retry) {
auto comm = isolate_data->comm;
comm->Refresh();
}
Expand All @@ -282,10 +302,6 @@ template <typename HandlerType> void N1QL::ExecQuery(QueryHandler &q_handler) {

lcb_t &instance = q_handler.instance;
lcb_error_t err;
lcb_CMDN1QL cmd = {0};
lcb_N1QLHANDLE handle = nullptr;
cmd.handle = &handle;
cmd.callback = RowCallback<HandlerType>;

lcb_N1QLPARAMS *n1ql_params = lcb_n1p_new();
err = lcb_n1p_setstmtz(n1ql_params, q_handler.query.c_str());
Expand All @@ -302,34 +318,55 @@ template <typename HandlerType> void N1QL::ExecQuery(QueryHandler &q_handler) {
}
}

std::string err_msg;
auto must_retry = RetryWithFixedBackoff(
5, 0, [](bool retry) -> bool { return retry; },
ExecQueryImpl<HandlerType>, isolate_, instance, n1ql_params, err_msg);
if (must_retry) {
auto js_exception = UnwrapData(isolate_)->js_exception;
js_exception->ThrowN1QLError(
"Query did not succeed even after 5 attempts, error : " + err_msg);
}

// Resource clean-up.
lcb_set_cookie(instance, nullptr);
qhandler_stack.Pop();
inst_pool_->Restore(instance);
}

template <typename HandlerType>
bool N1QL::ExecQueryImpl(v8::Isolate *isolate, lcb_t &instance,
lcb_N1QLPARAMS *n1ql_params, std::string &err_out) {
lcb_CMDN1QL cmd = {0};
lcb_N1QLHANDLE handle = nullptr;
cmd.handle = &handle;
cmd.callback = RowCallback<HandlerType>;

lcb_n1p_mkcmd(n1ql_params, &cmd);

err = lcb_n1ql_query(instance, nullptr, &cmd);
auto err = lcb_n1ql_query(instance, nullptr, &cmd);
if (err != LCB_SUCCESS) {
// for example: when there is no query node
ConnectionPool::Error(instance, "N1QL: Unable to schedule N1QL query", err);
auto js_exception = UnwrapData(isolate_)->js_exception;
auto js_exception = UnwrapData(isolate)->js_exception;
js_exception->ThrowN1QLError("N1QL: Unable to schedule N1QL query");
}

lcb_n1p_free(n1ql_params);

// Add the N1QL handle as cookie - allow for query cancellation.
HandlerCookie cookie;
cookie.isolate = isolate_;
cookie.isolate = isolate;
// Add the N1QL handle as cookie - allow for query cancellation.
cookie.handle = handle;
lcb_set_cookie(instance, &cookie);

// Run the query
cookie.must_retry = false;
err = lcb_wait(instance);
if (err != LCB_SUCCESS) {
ConnectionPool::Error(instance, "N1QL: Query execution failed", err);
}

// Resource clean-up.
lcb_set_cookie(instance, nullptr);
qhandler_stack.Pop();
inst_pool_->Restore(instance);
err_out = cookie.error;
return cookie.must_retry;
}

bool N1QL::IsStatusSuccess(const char *row) {
Expand Down Expand Up @@ -791,3 +828,75 @@ template <typename T> v8::Local<T> ToLocal(const v8::MaybeLocal<T> &handle) {

return handle_scope.Escape(value);
}

N1QLErrorExtractor::N1QLErrorExtractor(v8::Isolate *isolate)
: isolate_(isolate) {
v8::HandleScope handle_scope(isolate_);
auto context = isolate_->GetCurrentContext();
context_.Reset(isolate_, context);
}

N1QLErrorExtractor::~N1QLErrorExtractor() { context_.Reset(); }

ErrorCodesInfo N1QLErrorExtractor::GetErrorCodes(const char *err_str) {
v8::HandleScope handle_scope(isolate_);
auto context = context_.Get(isolate_);
std::vector<int64_t> errors;

v8::Local<v8::Value> error_val;
if (!TO_LOCAL(v8::JSON::Parse(isolate_, v8Str(isolate_, err_str)),
&error_val)) {
return {true, "Unable to parse error JSON"};
}

v8::Local<v8::Object> error_obj;
if (!TO_LOCAL(error_val->ToObject(context), &error_obj)) {
return {true, "Unable to cast error to Object"};
}

v8::Local<v8::Value> errors_val;
if (!TO_LOCAL(error_obj->Get(context, v8Str(isolate_, "errors")),
&errors_val)) {
return {true, "Unable to read errors property from message Object"};
}
return GetErrorCodes(errors_val);
}

ErrorCodesInfo
N1QLErrorExtractor::GetErrorCodes(const v8::Local<v8::Value> &errors_val) {
v8::HandleScope handle_scope(isolate_);
auto context = context_.Get(isolate_);
std::vector<int64_t> errors;

auto errors_v8arr = errors_val.As<v8::Array>();
const auto len = errors_v8arr->Length();
errors.resize(static_cast<std::size_t>(len));
for (uint32_t i = 0; i < len; ++i) {
v8::Local<v8::Value> error_val;
if (!TO_LOCAL(errors_v8arr->Get(context, i), &error_val)) {
return {true,
"Unable to read error Object at index " + std::to_string(i)};
}

v8::Local<v8::Object> error_obj;
if (!TO_LOCAL(error_val->ToObject(context), &error_obj)) {
return {true, "Unable to cast error at index " + std::to_string(i) +
" to Object"};
}

v8::Local<v8::Value> code_val;
if (!TO_LOCAL(error_obj->Get(context, v8Str(isolate_, "code")),
&code_val)) {
return {true, "Unable to get code from error Object at index " +
std::to_string(i)};
}

v8::Local<v8::Integer> code_v8int;
if (!TO_LOCAL(code_val->ToInteger(context), &code_v8int)) {
return {true, "Unable to cast code to integer in error Object at index " +
std::to_string(i)};
}
errors[static_cast<std::size_t>(i)] = code_v8int->Value();
}
return errors;
}
4 changes: 2 additions & 2 deletions tests/functional_tests/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
rbacSetupURL = "http://127.0.0.1:9000/settings/rbac/users/local"
bucketStatsURL = "http://127.0.0.1:9000/pools/default/buckets/"
indexerURL = "http://127.0.0.1:9000/settings/indexes"
queryURL = "http://127.0.0.1:9499/query/service"
queryURL = "http://127.0.0.1:9001/_p/query/query/service"
)

const (
Expand Down Expand Up @@ -84,7 +84,7 @@ const (

const (
dataDir = "%2Ftmp%2Fdata"
services = "kv%2Cn1ql%2Cindex%2Ceventing"
services = "kv%2Ceventing"
)

const (
Expand Down
Loading

0 comments on commit a4d9500

Please sign in to comment.