Skip to content

Commit

Permalink
fix(groupBy): group duration notifiers will now properly unsubscribe …
Browse files Browse the repository at this point in the history
…and clean up (#2662)

* test(groupBy): Test that GroupBy-durationSelectors are disposed of

Duration selectors where not disposed when the GroupDurationSubscriber's completed

* fix(groupBy): unsubscribe GroupDurationSubscriber after completing the group

The Groups are disposed by the GroupDurationSelector, however the
GroupDurationSubscriber can be subscribed to a different observable
than the group itself. To prevent any unwanted subscriptions to
accumulate over time we need to explicitly unsubscribe after the
first event in GroupDurationSubscriber closes the group.

Fixes #2660

* fix(groupBy): ensures durationSelector subscriptions are cleaned

The subscriptions to the durationSelector would pile up in the
internal subscription list of the GroupBySubscriber. By removing
the GroupDurationSubscriber explicitly from the GroupBySubscriber
we prevent potential OOM exceptions.

Fixes #2661

* refactor(groupBy): Refactor GroupDurationSubscriber to pass-through to underlying group Subject

* refactor(groupBy): simplify durationSelector test
  • Loading branch information
hermanbanken authored and benlesh committed Jun 14, 2017
1 parent 97ce813 commit ab92083
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 15 deletions.
33 changes: 33 additions & 0 deletions spec/operators/groupBy-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,39 @@ describe('Observable.prototype.groupBy', () => {
expectObservable(source, unsub).toBe(expected, expectedGroups);
});

it('should dispose a durationSelector after closing the group',
() => {
const obs = hot('-0-1--------2-|');
const sub = '^ !' ;
let unsubs = [
'-^--!',
'---^--!',
'------------^-!',
];
const dur = '---s';
const durations = [
cold(dur),
cold(dur),
cold(dur)
];

const unsubscribedFrame = Rx.TestScheduler
.parseMarblesAsSubscriptions(sub)
.unsubscribedFrame;

obs.groupBy(
(val: string) => val,
(val: string) => val,
(group: any) => durations[group.key]
).subscribe();

rxTestScheduler.schedule(() => {
durations.forEach((d, i) => {
expectSubscriptions(d.subscriptions).toBe(unsubs[i]);
});
}, unsubscribedFrame);
});

it('should allow using a durationSelector, but keySelector throws', () => {
const values = {
a: ' foo',
Expand Down
22 changes: 7 additions & 15 deletions src/operator/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,27 +227,19 @@ class GroupDurationSubscriber<K, T> extends Subscriber<T> {
constructor(private key: K,
private group: Subject<T>,
private parent: GroupBySubscriber<any, K, T>) {
super();
super(group);
}

protected _next(value: T): void {
this._complete();
}

protected _error(err: any): void {
const group = this.group;
if (!group.closed) {
group.error(err);
}
this.parent.removeGroup(this.key);
this.complete();
}

protected _complete(): void {
const group = this.group;
if (!group.closed) {
group.complete();
protected _unsubscribe() {
const { parent, key } = this;
this.key = this.parent = null;
if (parent) {
parent.removeGroup(key);
}
this.parent.removeGroup(this.key);
}
}

Expand Down

0 comments on commit ab92083

Please sign in to comment.