Skip to content

Commit

Permalink
Add group function to datastores
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Conner committed Oct 11, 2016
1 parent d064be5 commit c3ef2bf
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 31 deletions.
26 changes: 13 additions & 13 deletions src/aggregation.js
@@ -1,16 +1,16 @@
import result from 'lodash/result';
import assign from 'lodash/assign';
import forEach from 'lodash/forEach';
import isString from 'lodash/isString';
import isObject from 'lodash/isObject';
import isFunction from 'lodash/isFunction';
import { KinveyError } from './errors';
import { Query } from './query';
import { isDefined } from './utils';

/**
* @private
*/
export class Aggregation {
export default class Aggregation {
constructor(options) {
options = assign({
query: null,
Expand All @@ -26,31 +26,31 @@ export class Aggregation {
}

get initial() {
return this.aggregationInitial;
return this._initial;
}

set initial(initial) {
if (!isObject(initial)) {
throw new KinveyError('initial must be an Object.');
}

this.aggregationInitial = initial;
this._initial = initial;
}

get query() {
return this.aggregationQuery;
return this._query;
}

set query(query) {
if (query && !(query instanceof Query)) {
query = new Query(result(query, 'toJSON', query));
if (isDefined(query) && !(query instanceof Query)) {
throw new KinveyError('Invalid query. It must be an instance of the Query class.');
}

this.aggregationQuery = query;
this._query = query;
}

get reduceFn() {
return this.aggregationReduceFn;
return this._reduceFn;
}

set reduceFn(fn) {
Expand All @@ -62,7 +62,7 @@ export class Aggregation {
throw new KinveyError('fn argument must be of type function or string.');
}

this.aggregationReduceFn = fn;
this._reduceFn = fn;
}

by(field) {
Expand All @@ -72,7 +72,7 @@ export class Aggregation {

process(entities = []) {
const groups = {};
const response = [];
const result = [];
const aggregation = this.toJSON();
const reduceFn = aggregation.reduceFn.replace(/function[\s\S]*?\([\s\S]*?\)/, '');
aggregation.reduce = new Function(['doc', 'out'], reduceFn); // eslint-disable-line no-new-func
Expand Down Expand Up @@ -104,10 +104,10 @@ export class Aggregation {

const segments = Object.keys(groups);
forEach(segments, (segment) => {
response.push(groups[segment]);
result.push(groups[segment]);
});

return response;
return result;
}

toJSON() {
Expand Down
75 changes: 74 additions & 1 deletion src/datastore/src/cachestore.js
@@ -1,7 +1,8 @@
import NetworkStore from './networkstore';
import { CacheRequest, AuthType, RequestMethod } from '../../request';
import { KinveyError } from '../../errors';
import { Query } from '../../query';
import Query from '../../query';
import Aggregation from '../../aggregation';
import SyncManager from './sync';
import { Metadata } from '../../entity';
import { KinveyObservable } from '../../utils';
Expand Down Expand Up @@ -226,6 +227,78 @@ export default class CacheStore extends NetworkStore {
return stream;
}

/**
* Group entities.
*
* @param {Aggregation} aggregation Aggregation used to group entities.
* @param {Object} [options] Options
* @param {Properties} [options.properties] Custom properties to send with
* the request.
* @param {Number} [options.timeout] Timeout for the request.
* @return {Observable} Observable.
*/
group(aggregation, options = {}) {
options = assign({ syncAutomatically: this.syncAutomatically }, options);
const syncAutomatically = options.syncAutomatically === true;
const stream = KinveyObservable.create((observer) => {
// Check that the aggregation is valid
if (!(aggregation instanceof Aggregation)) {
return observer.error(new KinveyError('Invalid aggregation. It must be an instance of the Aggregation class.'));
}

// Fetch the cache entities
const request = new CacheRequest({
method: RequestMethod.GET,
url: url.format({
protocol: this.client.protocol,
host: this.client.host,
pathname: `${this.pathname}/_group`
}),
properties: options.properties,
aggregation: aggregation,
timeout: options.timeout
});

// Execute the request
return request.execute()
.then(response => response.data)
.catch(() => [])
.then((cacheResult = []) => {
observer.next(cacheResult);

if (syncAutomatically === true) {
// Attempt to push any pending sync data before fetching from the network.
return this.pendingSyncCount(null, options)
.then((syncCount) => {
if (syncCount > 0) {
return this.push(null, options)
.then(() => this.pendingSyncCount(null, options));
}

return syncCount;
})
.then((syncCount) => {
// Throw an error if there are still items that need to be synced
if (syncCount > 0) {
throw new KinveyError('Unable to load data from the network.'
+ ` There are ${syncCount} entities that need`
+ ' to be synced before data is loaded from the network.');
}

// Group the network entities
return super.group(aggregation, options).toPromise();
});
}

return cacheResult;
})
.then(result => observer.next(result))
.then(() => observer.complete())
.catch(error => observer.error(error));
});
return stream;
}

/**
* Count all entities in the data store. A query can be optionally provided to return
* a subset of all entities in a collection or omitted to return all entities in
Expand Down
49 changes: 46 additions & 3 deletions src/datastore/src/networkstore.js
@@ -1,8 +1,9 @@
import { DeltaFetchRequest, KinveyRequest, AuthType, RequestMethod } from '../../request';
import { KinveyError } from '../../errors';
import { Query } from '../../query';
import Query from '../../query';
import { Client } from '../../client';
import { KinveyObservable, Log } from '../../utils';
import { KinveyObservable, Log, isDefined } from '../../utils';
import Aggregation from '../../aggregation';
import Promise from 'es6-promise';
import isString from 'lodash/isString';
import url from 'url';
Expand Down Expand Up @@ -120,7 +121,7 @@ export default class NetworkStore {
const useDeltaFetch = options.useDeltaFetch === true || this.useDeltaFetch;
const stream = KinveyObservable.create((observer) => {
// Check that the query is valid
if (query && !(query instanceof Query)) {
if (isDefined(query) && !(query instanceof Query)) {
return observer.error(new KinveyError('Invalid query. It must be an instance of the Query class.'));
}

Expand Down Expand Up @@ -205,6 +206,48 @@ export default class NetworkStore {
return stream;
}

/**
* Group entities.
*
* @param {Aggregation} aggregation Aggregation used to group entities.
* @param {Object} [options] Options
* @param {Properties} [options.properties] Custom properties to send with
* the request.
* @param {Number} [options.timeout] Timeout for the request.
* @return {Observable} Observable.
*/
group(aggregation, options = {}) {
const stream = KinveyObservable.create((observer) => {
// Check that the query is valid
if (!(aggregation instanceof Aggregation)) {
return observer.error(new KinveyError('Invalid aggregation. It must be an instance of the Aggregation class.'));
}

// Create the request
const request = new KinveyRequest({
method: RequestMethod.GET,
authType: AuthType.Default,
url: url.format({
protocol: this.client.protocol,
host: this.client.host,
pathname: `${this.pathname}/_group`,
}),
properties: options.properties,
aggregation: aggregation,
timeout: options.timeout,
client: this.client
});

// Execute the request
return request.execute()
.then(response => response.data)
.then(data => observer.next(data))
.then(() => observer.complete())
.catch(error => observer.error(error));
});
return stream;
}

/**
* Count all entities in the data store. A query can be optionally provided to return
* a subset of all entities in a collection or omitted to return all entities in
Expand Down
2 changes: 1 addition & 1 deletion src/datastore/src/sync.js
Expand Up @@ -7,7 +7,7 @@ import {
} from '../../request';
import { InsufficientCredentialsError, SyncError } from '../../errors';
import { Client } from '../../client';
import { Query } from '../../query';
import Query from '../../query';
import Promise from 'es6-promise';
import url from 'url';
import map from 'lodash/map';
Expand Down
43 changes: 42 additions & 1 deletion src/datastore/src/syncstore.js
@@ -1,7 +1,8 @@
import CacheStore from './cachestore';
import { CacheRequest, RequestMethod } from '../../request';
import { KinveyError } from '../../errors';
import { Query } from '../../query';
import Query from '../../query';
import Aggregation from '../../aggregation';
import { KinveyObservable } from '../../utils';
import url from 'url';

Expand Down Expand Up @@ -100,6 +101,46 @@ export default class SyncStore extends CacheStore {
return stream;
}

/**
* Group entities.
*
* @param {Aggregation} aggregation Aggregation used to group entities.
* @param {Object} [options] Options
* @param {Properties} [options.properties] Custom properties to send with
* the request.
* @param {Number} [options.timeout] Timeout for the request.
* @return {Observable} Observable.
*/
group(aggregation, options = {}) {
const stream = KinveyObservable.create((observer) => {
// Check that the aggregation is valid
if (!(aggregation instanceof Aggregation)) {
return observer.error(new KinveyError('Invalid aggregation. It must be an instance of the Aggregation class.'));
}

// Fetch the cache entities
const request = new CacheRequest({
method: RequestMethod.GET,
url: url.format({
protocol: this.client.protocol,
host: this.client.host,
pathname: `${this.pathname}/_group`
}),
properties: options.properties,
aggregation: aggregation,
timeout: options.timeout
});

// Execute the request
return request.execute()
.then(response => response.data)
.then(result => observer.next(result))
.then(() => observer.complete())
.catch(error => observer.error(error));
});
return stream;
}

/**
* Count all entities in the data store. A query can be optionally provided to return
* a subset of all entities in a collection or omitted to return all entities in
Expand Down
4 changes: 2 additions & 2 deletions src/kinvey.js
@@ -1,8 +1,8 @@
import { Client } from './client';
import { CustomEndpoint } from './endpoint';
import { Query } from './query';
import Query from './query';
import { Log } from './utils';
import { Aggregation } from './aggregation';
import Aggregation from './aggregation';
import DataStore, { DataStoreType, FileStore } from './datastore';
import { Acl, Metadata, User, UserStore } from './entity';
import { AuthorizationGrant, SocialIdentity } from './identity';
Expand Down
2 changes: 1 addition & 1 deletion src/query.js
Expand Up @@ -21,7 +21,7 @@ const unsupportedFilters = ['$nearSphere'];
* var query = new Kinvey.Query();
* query.equalTo('name', 'Kinvey');
*/
export class Query {
export default class Query {
/**
* Create an instance of the Query class.
*
Expand Down
36 changes: 35 additions & 1 deletion src/request/src/cacherequest.js
Expand Up @@ -4,6 +4,10 @@ import KinveyResponse from './kinveyresponse';
import UrlPattern from 'url-pattern';
import url from 'url';
import localStorage from 'local-storage';
import { KinveyError } from '../../errors';
import Query from '../../query';
import Aggregation from '../../aggregation';
import { isDefined } from '../../utils';
// const usersNamespace = process.env.KINVEY_USERS_NAMESPACE || 'user';
// const activeUserCollectionName = process.env.KINVEY_USER_ACTIVE_COLLECTION_NAME || 'kinvey_active_user';

Expand All @@ -13,10 +17,35 @@ import localStorage from 'local-storage';
export default class CacheRequest extends Request {
constructor(options = {}) {
super(options);
this.aggregation = options.aggregation;
this.query = options.query;
this.rack = this.client.cacheRack;
}

get query() {
return this._query;
}

set query(query) {
if (isDefined(query) && !(query instanceof Query)) {
throw new KinveyError('Invalid query. It must be an instance of the Query class.');
}

this._query = query;
}

get aggregation() {
return this._aggregation;
}

set aggregation(aggregation) {
if (isDefined(aggregation) && !(aggregation instanceof Aggregation)) {
throw new KinveyError('Invalid aggregation. It must be an instance of the Aggregation class.');
}

this._aggregation = aggregation;
}

get url() {
return super.url;
}
Expand Down Expand Up @@ -49,10 +78,15 @@ export default class CacheRequest extends Request {
}

// If a query was provided then process the data with the query
if (this.query) {
if (isDefined(this.query)) {
response.data = this.query.process(response.data);
}

// If an aggregation was provided then process the data with the aggregation
if (isDefined(this.aggregation)) {
response.data = this.aggregation.process(response.data);
}

// Just return the response
return response;
});
Expand Down

0 comments on commit c3ef2bf

Please sign in to comment.