Skip to content

Commit

Permalink
feat(aggregation): Add aggregation stages: match, sort
Browse files Browse the repository at this point in the history
  • Loading branch information
eastolfi committed Oct 19, 2016
1 parent 03facba commit eba674a
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 1,128 deletions.
44 changes: 39 additions & 5 deletions lib/Aggregation.js

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions lib/Collection.js

Large diffs are not rendered by default.

113 changes: 85 additions & 28 deletions lib/Cursor.js

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions lib/Selector.js

Large diffs are not rendered by default.

42 changes: 38 additions & 4 deletions src/Aggregation.js
Expand Up @@ -8,20 +8,22 @@
*/

var _ = require("lodash"),
Logger = require("jsw-logger");
Logger = require("jsw-logger"),
Cursor = require("./Cursor"),
Selector = require("./Selector");

var logger = null;

var stages = {
'$project': false,
'$match': false,
'$match': true,
'$redact': false,
'$limit': false,
'$skip': false,
'$unwind': false,
'$group': true,
'$sample': false,
'$sort': false,
'$sort': true,
'$geoNear': false,
'$lookup': false,
'$out': false,
Expand Down Expand Up @@ -158,10 +160,28 @@ var do_complex_group = function() {

};

var do_sort = function(documents, sort_stage) {
return documents.sort(new Selector(sort_stage, Selector.SORT_SELECTOR));
};

var do_match = function(documents, match_stage) {
var cursor = new Cursor(documents, match_stage);

return cursor.fetch();
};

var do_group = function(documents, group_stage) {
if (!_.hasIn(group_stage, '_id')) logger.throw('The field "_id" is required in the "$group" stage');

let new_id = group_stage['_id'];

if (!_.isNull(new_id)) {
if (new_id.substr(0, 1) !== '$') {
logger.throw("Field names references in a right side assignement must be preceded by '$'");
} else {
new_id = new_id.substr(1, new_id.length);
}
}

if (_.isPlainObject(new_id)) {
// complex_id
Expand All @@ -180,16 +200,30 @@ class Aggregation {
}

aggregate(collection) {
var docs = collection.docs;

for (let i = 0; i < this.pipeline.length; i++) {
let stage = this.pipeline[i];

for (let key in stage) {
switch (key) {
case '$match':
docs = do_match(docs, stage[key]);

break;
case '$group':
return do_group(collection.docs, stage[key]);
docs = do_group(docs, stage[key]);

break;
case '$sort':
docs = do_sort(docs, stage[key]);

break;
}
}
}

return docs; // move to cursor
}

validStage(stage) {
Expand Down
5 changes: 2 additions & 3 deletions src/Collection.js
Expand Up @@ -177,8 +177,7 @@ Collection.prototype.find = function (selection, fields, options, callback) {
options = params.options;
callback = params.callback;

// callback for backward compatibility
var cursor = new Cursor(this.db, this, selection, fields, options);
var cursor = new Cursor(this.docs, selection, fields, options);

/**
* "find" event.
Expand Down Expand Up @@ -239,7 +238,7 @@ Collection.prototype.findOne = function (selection, fields, options, callback) {
options = params.options;
callback = params.callback;

var cursor = new Cursor(this.db, this, selection, fields, options);
var cursor = new Cursor(this.docs, selection, fields, options);

/**
* "findOne" event.
Expand Down
106 changes: 82 additions & 24 deletions src/Cursor.js
Expand Up @@ -23,7 +23,7 @@ var logger = null;
* @classdesc Cursor class that maps a MongoDB-like cursor
*
* @param {MongoPortable} db - Additional options
* @param {Collection} collection - The collection instance
* @param {Array} documents - The list of documents
* @param {Object|Array|String} [selection={}] - The selection for matching documents
* @param {Object|Array|String} [fields={}] - The fields of the document to show
* @param {Object} [options] - Database object
Expand All @@ -32,9 +32,8 @@ var logger = null;
*
*/
class Cursor {
constructor(db, collection, selection, fields, options = {}) {
this.db = db;
this.collection = collection;
constructor(documents, selection, fields, options = {}) {
this.documents = documents;
this.selector = selection;
this.skipValue = options.skip || 0;
this.limitValue = options.limit || 15;
Expand All @@ -43,6 +42,7 @@ class Cursor {

logger = Logger.instance;

/** ADD IDX **/
if (Selector.isSelectorCompiled(this.selector)) {
this.selector_compiled = this.selector;
} else {
Expand All @@ -65,6 +65,26 @@ class Cursor {
}
}

/** ADD IDX **/

this.fetch_mode = Cursor.COLSCAN || Cursor.IDXSCAN;
this.indexex = null;//findUsableIndexes();

// if (cursor.fetch_mode === Cursor.COLSCAN) {
// // COLSCAN, wi will iterate over all documents
// docs = _.cloneDeep(cursor.collection.docs);
// } else if (cursor.fetch_mode === Cursor.IDXSCAN) {
// // IDXSCAN, wi will iterate over all needed documents
// for (let i = 0; i < cursor.indexes.length; i++) {
// let index = cursor.indexes[i];

// for (let i = index.start; i < index.end; i++) {
// let idx_id = cursor.collection.getIndex(index.name)[i];

// docs.push(cursor.collection.docs[idx_id]);
// }
// }
// }

this.fields = new Selector(fields, Selector.FIELD_SELECTOR);

Expand All @@ -75,6 +95,9 @@ class Cursor {
}
}

Cursor.COLSCAN = 'colscan';
Cursor.IDXSCAN = 'idxscan';

/**
* Moves a cursor to the begining
*
Expand Down Expand Up @@ -127,7 +150,7 @@ Cursor.prototype.map = function(callback) {
* @returns {Boolean} True if we can fetch one more document
*/
Cursor.prototype.hasNext = function() {
return (this.cursor_pos < this.collection.docs.length);
return (this.cursor_pos < this.documents.length);
};

/**
Expand Down Expand Up @@ -233,25 +256,44 @@ var _mapFields = function(doc, fields) {
* @returns {Array|Object} If [justOne=true] returns the next document, otherwise returns all the documents
*/
var _getDocuments = function(cursor, justOne = false) {
if (cursor.selector_id) {
if (_.hasIn(cursor.collection.doc_indexes, _.toString(cursor.selector_id))) {
let idx = cursor.collection.doc_indexes[_.toString(cursor.selector_id)];
var docs = [];

if (cursor.fetch_mode === Cursor.COLSCAN) {
// COLSCAN, wi will iterate over all documents
docs = _.cloneDeep(cursor.documents);
} else if (cursor.fetch_mode === Cursor.IDXSCAN) {
// IDXSCAN, wi will iterate over all needed documents
for (let i = 0; i < cursor.indexes.length; i++) {
let index = cursor.indexes[i];

return _mapFields(cursor.collection.docs[idx], cursor.fields);
} else {
if (justOne) {
return null;
} else {
return [];
for (let i = index.start; i < index.end; i++) {
// let idx_id = cursor.collection.getIndex(index.name)[i];
let idx_id = index.index[i];

docs.push(cursor.documents[idx_id]);
}
}
}

// if (cursor.selector_id) {
// if (_.hasIn(cursor.collection.doc_indexes, _.toString(cursor.selector_id))) {
// let idx = cursor.collection.doc_indexes[_.toString(cursor.selector_id)];

// return _mapFields(cursor.collection.docs[idx], cursor.fields);
// } else {
// if (justOne) {
// return null;
// } else {
// return [];
// }
// }
// }

// TODO add warning when sort/skip/limit and fetching one
// TODO add warning when skip/limit without order
// TODO index
while (cursor.cursor_pos < cursor.collection.docs.length) {
var _doc = cursor.collection.docs[cursor.cursor_pos];
while (cursor.cursor_pos < docs.length) {
var _doc = docs[cursor.cursor_pos];
cursor.cursor_pos++;

if (cursor.selector_compiled.test(_doc)) {
Expand Down Expand Up @@ -290,6 +332,26 @@ Cursor.prototype.count = function() {
return this.fetchAll().length;
};

/**
* Set the sorting of the cursor
*
* @method Cursor#sort
*
* @param {Object|Array|String} spec - The sorting specification
*
* @returns {Cursor} This instance so it can be chained with other methods
*/
Cursor.prototype.setSorting = function(spec) {
if (_.isNil(spec)) logger.throw("You need to specify a sorting");

if (spec) {
this.sortValue = spec;
this.sort_compiled = (new Selector(spec, Selector.SORT_SELECTOR));
}

return this;
};

/**
* Applies a sorting on the cursor
*
Expand All @@ -307,15 +369,11 @@ Cursor.prototype.sort = function(spec) {
}

if (_sort) {
if (spec) {
this.sortValue = spec;
this.sort_compiled = _sort;
if (!_.isNil(this.db_objects) && _.isArray(this.db_objects)) {
this.db_objects = this.db_objects.sort(_sort);
this.sorted = true;
} else {
// If no spec, do sort
if (!_.isNil(this.db_objects) && _.isArray(this.db_objects)) {
this.db_objects = this.db_objects.sort(_sort);
this.sorted = true;
}
this.setSorting(spec);
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/Selector.js
@@ -1,6 +1,7 @@
var Logger = require("jsw-logger"),
_ = require("lodash"),
SelectorMatcher = require("./SelectorMatcher");
SelectorMatcher = require("./SelectorMatcher"),
ObjectId = require("./ObjectId");

var logger = null;

Expand Down Expand Up @@ -415,6 +416,11 @@ var _buildKeypathSelector = function (keypath, value) {

clause.type = 'operator_object';
}
} else if (value instanceof ObjectId) {
logger.debug('clause of type ObjectId -> String');

clause.type = 'string';
clause.value = value.toString();
} else {
clause.type = '__invalid__';
}
Expand Down
6 changes: 2 additions & 4 deletions test/3_Cursor.js
Expand Up @@ -32,11 +32,10 @@ describe("Cursor", function() {

expect(selector).to.exist;

var c = new Cursor(null, null, selector, ["field1, field2"], { sort: { field2: -1 } });
var c = new Cursor([], selector, ["field1, field2"], { sort: { field2: -1 } });

expect(c).to.exist;

expect(c.db).to.not.exist;
expect(c.collection).to.not.exist;
expect(c.selector).to.exist;
expect(c.fields).to.exist;
Expand All @@ -52,11 +51,10 @@ describe("Cursor", function() {
});

it("should be able to create a new instance from a compiled selector", function() {
var c = new Cursor(null, null, { field1: { $gte: 3 } }, ["field1, field2"], { sort: { field2: -1 } });
var c = new Cursor([], { field1: { $gte: 3 } }, ["field1, field2"], { sort: { field2: -1 } });

expect(c).to.exist;

expect(c.db).to.not.exist;
expect(c.collection).to.not.exist;
expect(c.selector).to.exist;
expect(c.fields).to.exist;
Expand Down

0 comments on commit eba674a

Please sign in to comment.