Skip to content

Commit

Permalink
feat(operator): add groupBy
Browse files Browse the repository at this point in the history
closes #165
  • Loading branch information
benlesh committed Aug 19, 2015
1 parent 7d9b52b commit 1e13aea
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 1 deletion.
65 changes: 65 additions & 0 deletions spec/operators/groupBy-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

var noop = function () { };

describe('Observable.prototype.groupBy()', function () {

it('should group values', function (done) {
var expectedGroups = [
{ key: 1, values: [1, 3] },
{ key: 0, values: [2] }
];

Observable.of(1, 2, 3)
.groupBy(function (x) { return x % 2 })
.subscribe(function (g) {
var expectedGroup = expectedGroups.shift();
expect(g.key).toBe(expectedGroup.key);

g.subscribe(function (x) {
expect(x).toBe(expectedGroup.values.shift());
});
}, null, done);
});

it('should group values with an element selector', function (done) {
var expectedGroups = [
{ key: 1, values: ['1!', '3!'] },
{ key: 0, values: ['2!'] }
];

Observable.of(1, 2, 3)
.groupBy(function (x) { return x % 2 }, function (x) { return x + '!'; })
.subscribe(function (g) {
var expectedGroup = expectedGroups.shift();
expect(g.key).toBe(expectedGroup.key);

g.subscribe(function (x) {
expect(x).toBe(expectedGroup.values.shift());
});
}, null, done);
});


it('should group values with a duration selector', function (done) {
var expectedGroups = [
{ key: 1, values: [1, 3] },
{ key: 0, values: [2, 4] },
{ key: 1, values: [5] },
{ key: 0, values: [6] }
];

Observable.of(1, 2, 3, 4, 5, 6)
.groupBy(function (x) { return x % 2 }, function (x) { return x; }, function (g) { return g.skip(1); })
.subscribe(function (g) {
var expectedGroup = expectedGroups.shift();
expect(g.key).toBe(expectedGroup.key);

g.subscribe(function (x) {
expect(x).toBe(expectedGroup.values.shift());
});
}, null, done);
});
});
5 changes: 4 additions & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Subscriber from './Subscriber';
import Subscription from './Subscription';
import ConnectableObservable from './observables/ConnectableObservable';
// HACK: the Babel part of the build doesn't like this reference.
import { GroupSubject } from './operators/groupBy';
// seems to put it in an infinite loop.
//import Notification from './Notification';

Expand Down Expand Up @@ -152,6 +153,8 @@ export default class Observable<T> {

catch: (selector: (err: any, source: Observable<T>, caught: Observable<any>) => Observable<any>) => Observable<T>;
retryWhen: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;

groupBy: <T, R>(keySelector: (value:T) => string, durationSelector?: (group:GroupSubject<R>) => Observable<any>, elementSelector?: (value:T) => R) => Observable<R>;

finally: (ensure: () => void, thisArg?: any) => Observable<T>;
}
}
4 changes: 4 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ import _finally from './operators/finally';

observableProto.finally = _finally;

import groupBy from './operators/groupBy';

observableProto.groupBy = groupBy;

export default {
Subject,
Scheduler,
Expand Down
139 changes: 139 additions & 0 deletions src/operators/groupBy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import Subject from '../Subject';
import Map from '../util/Map';
import FastMap from '../util/FastMap';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function groupBy<T, R>(keySelector: (value: T) => string,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupSubject<R>) => Observable<any>): Observable<GroupSubject<R>>
{
return this.lift(new GroupByOperator<T, R>(keySelector, durationSelector, elementSelector));
}

export class GroupByOperator<T, R> extends Operator<T, R> {
constructor(private keySelector: (value: T) => string,
private durationSelector?: (grouped: GroupSubject<R>) => Observable<any>,
private elementSelector?: (value: T) => R) {
super();
}

call(observer: Observer<R>): Observer<T> {
return new GroupBySubscriber<T, R>(observer, this.keySelector, this.durationSelector, this.elementSelector);
}
}

export class GroupBySubscriber<T, R> extends Subscriber<T> {
private groups = null;

constructor(destination: Observer<R>, private keySelector: (value: T) => string,
private durationSelector?: (grouped: GroupSubject<R>) => Observable<any>,
private elementSelector?: (value: T) => R) {
super(destination);
}

_next(x: T) {
let key = tryCatch(this.keySelector)(x);
if(key === errorObject) {
this.error(key.e);
} else {
let groups = this.groups;
const elementSelector = this.elementSelector;
const durationSelector = this.durationSelector;

if (!groups) {
groups = this.groups = typeof key === 'string' ? new FastMap() : new Map();
}

let group: GroupSubject<R> = groups.get(key);

if (!group) {
groups.set(key, group = new GroupSubject(key));

if (durationSelector) {
let duration = tryCatch(durationSelector)(group);
if (duration === errorObject) {
this.error(duration.e);
} else {
this.add(duration.subscribe(new GroupDurationSubscriber(group, this)));
}
}

this.destination.next(group);
}

if (elementSelector) {
let value = tryCatch(elementSelector)(x)
if(value === errorObject) {
group.error(value.e);
} else {
group.next(value);
}
} else {
group.next(x);
}
}
}

_error(err: any) {
const groups = this.groups;
if (groups) {
groups.forEach((group, key) => {
group.error(err);
this.removeGroup(key);
});
}
this.destination.error(err);
}

_complete() {
const groups = this.groups;
if(groups) {
groups.forEach((group, key) => {
group.complete();
this.removeGroup(group);
});
}
this.destination.complete();
}

removeGroup(key: string) {
this.groups[key] = null;
}
}

export class GroupSubject<T> extends Subject<T> {
constructor(public key: string) {
super();
}
}

export class GroupDurationSubscriber<T> extends Subscriber<T> {
constructor(private group: GroupSubject<T>, private parent:GroupBySubscriber<any, T>) {
super(null);
}

_next(value: T) {
const group = this.group;
group.complete();
this.parent.removeGroup(group.key);
}

_error(err: any) {
const group = this.group;
group.error(err);
this.parent.removeGroup(group.key);
}

_complete() {
const group = this.group;
group.complete();
this.parent.removeGroup(group.key);
}
}
31 changes: 31 additions & 0 deletions src/util/FastMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
export default class FastMap {
size: number = 0;
private _values: Object = {};

delete(key: string): boolean {
this._values[key] = null;
return true;
}

set(key: string, value: any): FastMap {
this._values[key] = value;
return this;
}

get(key: string): any {
return this._values[key];
}

forEach(cb, thisArg) {
const values = this._values;
for (let key in values) {
if (values.hasOwnProperty(key)) {
cb.call(thisArg, values[key], key);
}
}
}

clear() {
this._values = {};
}
}
43 changes: 43 additions & 0 deletions src/util/Map.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { root } from './root';

export default root.Map || (function() {
function Map() {
this.size = 0;
this._values = [];
this._keys = [];
}

Map.prototype['delete'] = function (key) {
var i = this._keys.indexOf(key);
if (i === -1) { return false }
this._values.splice(i, 1);
this._keys.splice(i, 1);
this.size--;
return true;
};

Map.prototype.get = function (key) {
var i = this._keys.indexOf(key);
return i === -1 ? undefined : this._values[i];
};

Map.prototype.set = function (key, value) {
var i = this._keys.indexOf(key);
if (i === -1) {
this._keys.push(key);
this._values.push(value);
this.size++;
} else {
this._values[i] = value;
}
return this;
};

Map.prototype.forEach = function (cb, thisArg) {
for (var i = 0; i < this.size; i++) {
cb.call(thisArg, this._values[i], this._keys[i]);
}
};

return Map;
}());

0 comments on commit 1e13aea

Please sign in to comment.