Skip to content

Commit

Permalink
feat(aggregation): Add first aggregation functionallity
Browse files Browse the repository at this point in the history
  • Loading branch information
eastolfi committed Oct 19, 2016
1 parent bae5893 commit 03facba
Show file tree
Hide file tree
Showing 8 changed files with 1,720 additions and 7 deletions.
11 changes: 7 additions & 4 deletions Gruntfile.js
Expand Up @@ -39,6 +39,7 @@ module.exports = function(grunt) {
"lib/Selector.js": "src/Selector.js",
"lib/Cursor.js": "src/Cursor.js",
"lib/Collection.js": "src/Collection.js",
"lib/Aggregation.js": "src/Aggregation.js",
"lib/MongoPortable.js": "src/MongoPortable.js"
}
}
Expand All @@ -48,14 +49,15 @@ module.exports = function(grunt) {
all: {
src: ['test/1_ObjectId.js', 'test/2_Selector.js',
'test/3_Cursor.js', 'test/4_Collection.js',
'test/5_MongoPortable.js', 'test/6_Coverage.js']
'test/5_Aggregation.js', 'test/6_MongoPortable.js',
'test/7_Coverage.js']
}
},

jsdoc : {
dist : {
src: ['src/MongoPortable.js', 'src/Collection.js', 'src/Cursor.js',
'src/Selector.js', 'src/ObjectId.js'],
src: ['src/MongoPortable.js', 'src/Aggregation.js', 'src/Collection.js',
'src/Cursor.js', 'src/Selector.js', 'src/ObjectId.js'],
options: {
destination: 'doc',
config: 'jsdoc.conf.json'
Expand All @@ -65,12 +67,13 @@ module.exports = function(grunt) {

jsdoc2md: {
fullDoc: {
src: ['src/MongoPortable.js', 'src/Collection.js', 'src/Cursor.js'],
src: ['src/MongoPortable.js', 'src/Aggregation.js', 'src/Collection.js', 'src/Cursor.js'],
dest: 'api/documentation.md'
},
apiDoc: {
files: [
{ src: 'src/MongoPortable.js', dest: 'api/MongoPortable.md' },
{ src: 'src/Aggregation.js', dest: 'api/Aggregation.md' },
{ src: 'src/Collection.js', dest: 'api/Collection.md' },
{ src: 'src/Cursor.js', dest: 'api/Cursor.md' }
]
Expand Down
218 changes: 218 additions & 0 deletions lib/Aggregation.js

Large diffs are not rendered by default.

41 changes: 39 additions & 2 deletions lib/Collection.js

Large diffs are not rendered by default.

205 changes: 205 additions & 0 deletions src/Aggregation.js
@@ -0,0 +1,205 @@
/**
* @file Cursor.js - based on Monglo#Cursor ({@link https://github.com/Monglo}) by Christian Sullivan <cs@euforic.co> | Copyright (c) 2012
* @version 1.0.0
*
* @author Eduardo Astolfi <eduardo.astolfi91@gmail.com>
* @copyright 2016 Eduardo Astolfi <eduardo.astolfi91@gmail.com>
* @license MIT Licensed
*/

var _ = require("lodash"),
Logger = require("jsw-logger");

var logger = null;

var stages = {
'$project': false,
'$match': false,
'$redact': false,
'$limit': false,
'$skip': false,
'$unwind': false,
'$group': true,
'$sample': false,
'$sort': false,
'$geoNear': false,
'$lookup': false,
'$out': false,
'$indexStats': false
};

var group_operators = {
$sum: function(documents, new_id, new_field, value, isCount) {
var new_docs = {};

for (let i = 0; i < documents.length; i++) {
let doc = documents[i];
let val = value;

if (!isCount) {
val = doc[value.substr(1, value.length)] || 0;
}

if (_.hasIn(doc, new_id)) {
let _id = doc[new_id];

if (!_.hasIn(new_docs, _id)) {
new_docs[_id] = {
_id: _id,
[new_field]: _.toNumber(val)
};
} else {
new_docs[_id][new_field] += _.toNumber(val);
}
}
}

return new_docs;
},

$avg: function(documents, new_id, new_field, value, isCount) {
var new_docs = {};

for (let i = 0; i < documents.length; i++) {
let doc = documents[i];
let val = value;

if (!isCount) {
val = doc[value.substr(1, value.length)] || 0;
}

if (_.hasIn(doc, new_id) || _.isNull(new_id)) {
let _id = doc[new_id] || null;

if (!_.hasIn(new_docs, _id)) {
new_docs[_id] = {
_id: _id,
[new_field]: _.toNumber(val),
__COUNT__: 1
};
} else {
new_docs[_id][new_field] += _.toNumber(val);
new_docs[_id].__COUNT__++;
}
}
}

for (let key in new_docs) {
new_docs[key][new_field] = new_docs[key][new_field] / new_docs[key].__COUNT__;
delete new_docs[key].__COUNT__;
}

return new_docs;
}
};

var do_single_group = function(group_id, group_stage, documents) {
// var operators = {};

let docs = {};

for (let field in group_stage) {
if (field !== '_id') {
// handle group field
// let group_key = key;
let group_field = group_stage[field];

for (let key in group_field) {
if (!_.hasIn(group_operators, key)) logger.throw(`Unknown accumulator operator "${key}" for group stage`);

// loop through all documents
// var new_docs = {};
// for (let i = 0; i < documents.length; i++) {
// let doc = documents[i];

// if (_.hasIn(doc, group_id)) {
// let _id = doc[group_id];

// if (!_.hasIn(new_docs, _id)) {
// new_docs[_id] = {
// _id: _id,
// [new_field]: value
// };
// } else {
// new_docs[_id][new_field] += value;
// }
// }
// }

// if (!_.hasIn(operators, key)) operators[key] = [];

// operators[key].push({
// new_field: field,
// value: group_field[key]
// });

let count = true;
if (_.isString(group_field[key])) {
if (group_field[key].substr(0, 1) !== '$') logger.throw("Field names references in a right side assignement must be preceded by '$'");

if (!_.isFinite(_.toNumber(group_field[key]))) {
count = false;
}
}

let operator = group_operators[key];

_.merge(docs, operator(documents, group_id, field, group_field[key], count));

break;
}
}
}

return _.values(docs);
};

var do_complex_group = function() {

};

var do_group = function(documents, group_stage) {
if (!_.hasIn(group_stage, '_id')) logger.throw('The field "_id" is required in the "$group" stage');

let new_id = group_stage['_id'];

if (_.isPlainObject(new_id)) {
// complex_id
// do_complex_group();
} else {
// single_id
return do_single_group(new_id, group_stage, documents);
}
};

class Aggregation {
constructor(pipeline) {
logger = Logger.instance;

this.pipeline = pipeline;
}

aggregate(collection) {
for (let i = 0; i < this.pipeline.length; i++) {
let stage = this.pipeline[i];

for (let key in stage) {
switch (key) {
case '$group':
return do_group(collection.docs, stage[key]);
}
}
}
}

validStage(stage) {
if (!_.hasIn(stages, stage)) return logger.throw(`Unknown stage "${stage}"`);

if (stages[stage] === false) return logger.throw(`Unsupported stage "${stage}"`);

return true;
}
}


module.exports = Aggregation;
37 changes: 36 additions & 1 deletion src/Collection.js
Expand Up @@ -10,6 +10,7 @@
var Logger = require("jsw-logger"),
EventEmitter = require("./utils/EventEmitter"),
_ = require("lodash"),
Aggregation = require("./Aggregation"),
Cursor = require("./Cursor"),
ObjectId = require('./ObjectId'),
Selector = require("./Selector"),
Expand Down Expand Up @@ -157,7 +158,7 @@ Collection.prototype.insert = function (doc, options, callback) {
* @param {Number} [options.skip] - Number of documents to be skipped
* @param {Number} [options.limit] - Max number of documents to display
* @param {Object|Array|String} [options.fields] - Same as "fields" parameter (if both passed, "options.fields" will be ignored)
* @param {Boolean} [options.forceFetch=false] - If set to'"true" returns't"e;array of documents already fetched
* @param {Boolean} [options.forceFetch=false] - If set to'"true" returns the array of documents already fetched
*
* @param {Function} [callback=null] - Callback function to be called at the end with the results
*
Expand Down Expand Up @@ -878,6 +879,40 @@ Collection.prototype.restore = function (backupID, callback) {
return this;
};

/**
* Calculates aggregate values for the data in a collection
*
* @method Collection#aggregate
*
* @param {Array} pipeline - A sequence of data aggregation operations or stages
* @param {Object} [options] - Additional options
*
* @param {Boolean} [options.forceFetch=false] - If set to'"true" returns the array of documents already fetched
*
* @returns {Array|Cursor} If "options.forceFetch" set to true returns the array of documents, otherwise returns a cursor
*/
Collection.prototype.aggregate = function(pipeline, options = { forceFetch: false }) {
if (_.isNil(pipeline) || !_.isArray(pipeline)) logger.throw('The "pipeline" param must be an array');

var aggregation = new Aggregation(pipeline);

for (let i = 0; i < pipeline.length; i++) {
let stage = pipeline[i];

for (let key in stage) {
if (key.substr(0, 1) !== '$') logger.throw("The pipeline stages must begin with '$'");

if (!aggregation.validStage(key)) logger.throw(`Invalid stage "${key}"`);

break;
}
}

var result = aggregation.aggregate(this);

return result; // change to cursor
};

/**
* @ignore
*/
Expand Down

0 comments on commit 03facba

Please sign in to comment.