Skip to content

Commit

Permalink
JSCBC-214: Added support for N1QL querying through libcouchbase.
Browse files Browse the repository at this point in the history
Change-Id: I9f38c768932ca0ec65f76c771bd1a36d1338216f
Reviewed-on: http://review.couchbase.org/49600
Reviewed-by: Brett Lawson <brett19@gmail.com>
Tested-by: Brett Lawson <brett19@gmail.com>
  • Loading branch information
brett19 committed Apr 22, 2015
1 parent 14d4aea commit 3a94624
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 63 deletions.
220 changes: 157 additions & 63 deletions lib/bucket.js
Expand Up @@ -314,66 +314,6 @@ Bucket.prototype.setTranscoder = function(encoder, decoder) {
this._cb.setTranscoder(encoder, decoder);
};

/**
* Executes a N1QL query from a N1QL query string.
*
* @param {string} query
* @param {function} callback
* @private
* @ignore
*/
Bucket.prototype._query = function(query, callback) {
if (this.queryhosts === null) {
return callback(new Error('no available query nodes'));
}

var qhosts = this.queryhosts;
var host = qhosts[Math.floor(Math.random()*qhosts.length)];
var uri = 'http://' + host + '/query';

request.post({
uri: uri,
headers: {
'Content-type': 'application/x-www-form-urlencoded'
},
body: query
}, function (err, resp, body) {
if (err) {
return callback(err);
}

var res = null;
try {
res = JSON.parse(body);
} catch (e) {
return callback(new Error('failed to parse query response as json'));
}

var errObj = null;
if (res.errors) {
var errors = [];
for (var i = 0; i < res.errors.length; ++i) {
var errD = res.errors[i];
var errO = new Error('Query error ' + errD.code + ': ' + errD.msg);
errO.code = errD.code;
errors.push(errO);
}
if (errors.length === 1) {
errObj = errors[0];
} else {
errObj = errors;
}
}

if (res.results) {
return callback(errObj, res.results);
} else {
return callback(errObj, null);
}
});
};

/**
* Picks a random CAPI node and builds an http or https request against
* it using the passed path.
Expand Down Expand Up @@ -596,6 +536,160 @@ Bucket.prototype._view = function(viewtype, ddoc, name, q, callback) {
return req;
};

/**
* @class Meta
* @classdesc
* The meta-information available from a view query response.
* @private
* @memberof Bucket.N1qlQueryResponse
*/
/**
* The identifier for this query request.
*
* @var {number} Bucket.N1qlQueryResponse.Meta#requestID
* @since 2.0.8
* @committed
*/

/**
* Emitted whenever a new row is available from a queries result set.
*
* @event Bucket.N1qlQueryResponse#row
* @param {Object} row
* @param {Bucket.N1qlQueryResponse.Meta} meta
*
* @since 2.0.8
* @committed
*/
/**
* Emitted whenever all rows are available from a queries result set.
*
* @event Bucket.N1qlQueryResponse#rows
* @param {Object[]} rows
* @param {Bucket.N1qlQueryResponse.Meta} meta
*
* @since 2.0.8
* @committed
*/
/**
* Emitted once a query has completed executing and emitting all rows.
*
* @event Bucket.N1qlQueryResponse#end
* @param {Bucket.N1qlQueryResponse.Meta} meta
*
* @since 2.0.8
* @committed
*/
/**
* Emitted if an error occurs while executing a query.
*
* @event Bucket.N1qlQueryResponse#error
* @param {Error} error
*
* @since 2.0.8
* @committed
*/

/**
* An event emitter allowing you to bind to various query result set
* events.
*
* @constructor
*
* @private
* @memberof Bucket
* @extends events.EventEmitter
*
* @since 2.0.8
* @committed
*/
function N1qlQueryResponse() {
}
util.inherits(N1qlQueryResponse, events.EventEmitter);

/**
* Executes a N1QL http request.
*
* @param {string} queryStr
* @param {N1qlQueryResponse} emitter
*
* @private
* @ignore
*/
Bucket.prototype._n1qlReq = function(host, q, emitter) {
var rows = [];
this._cb.n1qlQuery(
host, q,
function(errCode, val) {
if (errCode === -1) { // Row
var row = val;
rows.push(row);
emitter.emit('row', row);
} else if (errCode === 0) { // Success
var meta = val;
emitter.emit('rows', rows, meta);
emitter.emit('end', meta);
} else { // Error
var errStr = val;
var jsonError = JSON.parse(errStr);
var err;

This comment has been minimized.

Copy link
@antono

antono Dec 28, 2015

What if jsonError is null?

if (jsonError.errors.length > 0) {
var firstErr = jsonError.errors[0];
err = new Error(firstErr.msg);
err.code = firstErr.code;
err.otherErrors = [];

for (var i = 1; i < jsonError.errors.length; ++i) {
var nextErr = jsonError.errors[i];
var otherErr = new Error(nextErr.msg);
otherErr.code = nextErr.code;
err.otherErrors.push(otherErr);
}
} else {
err = new Error('An unknown error occured');
}
err.requestID = jsonError.requestID;

emitter.emit('error', err);
}
});
};

/**
* Executes a N1QL query from a N1QL query string.
*
* @param {string} query
* @param {function} callback
* @private
* @ignore
*/
Bucket.prototype._n1ql = function(query, params, callback) {
var host;
if (this.queryhosts) {
var qhosts = this.queryhosts;
host = qhosts[Math.floor(Math.random() * qhosts.length)];
if (host.indexOf(':') === -1) {
host = host + ':8093';
}
}

var req = new N1qlQueryResponse();
this._maybeInvoke(this._n1qlReq.bind(this),
[host, query.toObject(params), req, callback]);

if (callback) {
req.on('rows', function(rows, meta) {
callback(null, rows, meta);
});
req.on('error', function(err) {
callback(err, null, null);
});
}

return req;
};

/**
* Executes a previously prepared query object. This could be a
* {@link ViewQuery} or a {@link N1qlQuery}.
Expand Down Expand Up @@ -626,9 +720,9 @@ Bucket.prototype.query = function(query, params, callback) {
return this._view(
'_spatial', query.ddoc, query.name, query.options, callback);
} else if (query instanceof N1qlQuery) {
this._maybeInvoke(
this._query.bind(this),
[query.toString(params), callback]);
return this._n1ql(
query, params, callback
);
} else {
throw new TypeError(
'First argument needs to be a ViewQuery, SpatialQuery or N1qlQuery.');
Expand Down
18 changes: 18 additions & 0 deletions lib/n1qlquery.js
Expand Up @@ -97,6 +97,24 @@ N1qlStringQuery.prototype.toString = function(args) {
return queryOpts;
};

/**
* Returns the fully prepared object representation of this query.
*/
N1qlStringQuery.prototype.toObject = function(args) {
if (!args) {
return this.options;
}

var out = {};
for (var i in this.options) {
if (this.options.hasOwnProperty(i)) {
out[i] = this.options[i];
}
}
out.args = args;
return out;
};

/**
* Creates a query object directly from the passed query string.
*
Expand Down
3 changes: 3 additions & 0 deletions src/binding.cc
Expand Up @@ -31,6 +31,7 @@ Persistent<String> CouchbaseImpl::keyKey;
Persistent<String> CouchbaseImpl::docKey;
Persistent<String> CouchbaseImpl::geometryKey;
Persistent<String> CouchbaseImpl::rowsKey;
Persistent<String> CouchbaseImpl::resultsKey;
Persistent<String> lcbErrorKey;

extern "C" {
Expand Down Expand Up @@ -79,6 +80,7 @@ void CouchbaseImpl::Init(Handle<Object> target)
NODE_SET_PROTOTYPE_METHOD(t, "arithmetic", fnArithmetic);
NODE_SET_PROTOTYPE_METHOD(t, "durability", fnDurability);
NODE_SET_PROTOTYPE_METHOD(t, "viewQuery", fnViewQuery);
NODE_SET_PROTOTYPE_METHOD(t, "n1qlQuery", fnN1qlQuery);

target->Set(NanNew<String>("CouchbaseImpl"), t->GetFunction());
target->Set(NanNew<String>("Constants"), createConstants());
Expand All @@ -93,6 +95,7 @@ void CouchbaseImpl::Init(Handle<Object> target)
NanAssignPersistent(docKey, NanNew<String>("doc"));
NanAssignPersistent(geometryKey, NanNew<String>("geometry"));
NanAssignPersistent(rowsKey, NanNew<String>("rows"));
NanAssignPersistent(resultsKey, NanNew<String>("results"));

Handle<Object> jMod = NanGetCurrentContext()->Global()->Get(
NanNew<String>("JSON")).As<Object>();
Expand Down
48 changes: 48 additions & 0 deletions src/couchbase_impl.cc
Expand Up @@ -350,6 +350,54 @@ void viewrow_callback(lcb_t instance, int ignoreme,
callback->Call(2, args);
}

void n1qlrow_callback(lcb_t instance, int ignoreme,
const lcb_RESPN1QL *resp)
{
CouchbaseImpl *me = (CouchbaseImpl *)lcb_get_cookie(instance);
NanCallback *callback = (NanCallback*)resp->cookie;
NanScope();

Local<Function> jsonParseLcl = NanNew(CouchbaseImpl::jsonParse);

if (resp->rflags & LCB_RESP_F_FINAL) {
Handle<Value> dataRes;
if (resp->rc != LCB_SUCCESS) {
if (resp->row) {
dataRes = NanNew<String>((const char*)resp->row, (int)resp->nrow);
} else {
dataRes = NanNull();
}
} else {
Handle<Value> metaStr =
NanNew<String>((const char*)resp->row, (int)resp->nrow);
dataRes = jsonParseLcl->Call(NanGetCurrentContext()->Global(), 1, &metaStr);
Local<Object> metaObj = dataRes->ToObject();
if (!metaObj.IsEmpty()) {
metaObj->Delete(NanNew(CouchbaseImpl::resultsKey));
}
}

Handle<Value> args[] = {
NanNew<Number>(resp->rc),
dataRes
};
callback->Call(2, args);

delete callback;
return;
}

Handle<Value> rowStr =
NanNew<String>((const char*)resp->row, (int)resp->nrow);
Handle<Value> rowObj =
jsonParseLcl->Call(NanGetCurrentContext()->Global(), 1, &rowStr);
Handle<Value> args[] = {
NanNew<Number>(-1),
rowObj
};
callback->Call(2, args);
}

}

void CouchbaseImpl::setupLibcouchbaseCallbacks(void)
Expand Down
5 changes: 5 additions & 0 deletions src/couchbase_impl.h
Expand Up @@ -53,6 +53,7 @@
#include <libcouchbase/couchbase.h>
#include <libcouchbase/api3.h>
#include <libcouchbase/views.h>
#include <libcouchbase/n1ql.h>
#include <libcouchbase/configuration.h>

#include "cas.h"
Expand Down Expand Up @@ -120,6 +121,7 @@ class CouchbaseImpl: public node::ObjectWrap
static NAN_METHOD(fnArithmetic);
static NAN_METHOD(fnDurability);
static NAN_METHOD(fnViewQuery);
static NAN_METHOD(fnN1qlQuery);

public:
CouchbaseImpl(lcb_t inst);
Expand Down Expand Up @@ -159,6 +161,7 @@ class CouchbaseImpl: public node::ObjectWrap
static Persistent<String> docKey;
static Persistent<String> geometryKey;
static Persistent<String> rowsKey;
static Persistent<String> resultsKey;

};

Expand All @@ -167,6 +170,8 @@ class CouchbaseImpl: public node::ObjectWrap
extern "C" {
void viewrow_callback(lcb_t instance, int ignoreme,
const lcb_RESPVIEWQUERY *resp);
void n1qlrow_callback(lcb_t instance, int ignoreme,
const lcb_RESPN1QL *resp);
}

#endif

0 comments on commit 3a94624

Please sign in to comment.