Skip to content

Commit

Permalink
fix(groupBy): fix bugs with groupBy
Browse files Browse the repository at this point in the history
Fix groupBy in order to pass tests. Also refactor some code related to
groupBy, introducing groupBy-support.ts file. Most fixes are related to
making inner Observables groups be hot which continue executing even if
the outer Observable was unsubscribed. Another fix makes the outer
Observable throw an error if the elementSelector function throws. The
most significant refactor replaces GroupSubject with GroupedObservable,
to resemble the RxJS legacy API, and disallow using Observer methods in
the Subject.

Relates to issue #375.
  • Loading branch information
Andre Medeiros authored and benlesh committed Oct 9, 2015
1 parent 893c7fe commit 86992c6
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 42 deletions.
4 changes: 2 additions & 2 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Observable from './Observable';
import Scheduler from './Scheduler';
import ConnectableObservable from './observables/ConnectableObservable';
import Subject from './Subject'
import GroupSubject from './subjects/GroupSubject';
import {GroupedObservable} from './operators/groupBy-support';

export interface CoreOperators<T> {
buffer?: <T>(closingNotifier: Observable<any>) => Observable<T[]>;
Expand Down Expand Up @@ -30,7 +30,7 @@ export interface CoreOperators<T> {
first?: <R>(predicate?: (value: T, index: number, source: Observable<T>) => boolean, resultSelector?: (value:T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable<R>;
flatMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
flatMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
groupBy?: <T, R>(keySelector: (value:T) => string, durationSelector?: (group:GroupSubject<R>) => Observable<any>, elementSelector?: (value:T) => R) => Observable<R>;
groupBy?: <T, R>(keySelector: (value:T) => string, elementSelector?: (value:T) => R, durationSelector?: (group: GroupedObservable<R>) => Observable<any>) => Observable<GroupedObservable<R>>;
ignoreElements?: () => Observable<T>;
last?: <R>(predicate?: (value: T, index:number) => boolean, resultSelector?: (value: T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable<T>;
every?: (predicate: (value: T, index:number) => boolean, thisArg?: any) => Observable<T>;
Expand Down
2 changes: 1 addition & 1 deletion src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ observableProto.finally = _finally;
import first from './operators/first';
observableProto.first = first;

import groupBy from './operators/groupBy';
import {groupBy} from './operators/groupBy';
observableProto.groupBy = groupBy;

import ignoreElements from './operators/ignoreElements';
Expand Down
2 changes: 1 addition & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ observableProto.finally = _finally;
import first from './operators/first';
observableProto.first = first;

import groupBy from './operators/groupBy';
import {groupBy} from './operators/groupBy';
observableProto.groupBy = groupBy;

import ignoreElements from './operators/ignoreElements';
Expand Down
64 changes: 64 additions & 0 deletions src/operators/groupBy-support.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import Subscription from '../Subscription';
import Subject from '../Subject';
import Subscriber from '../Subscriber';
import Observable from '../Observable';

export class RefCountSubscription<T> extends Subscription<T> {
primary: Subscription<T>;
attemptedToUnsubscribePrimary: boolean = false;
count: number = 0;

constructor() {
super();
}

setPrimary(subscription: Subscription<T>) {
this.primary = subscription;
}

unsubscribe() {
if (!this.isUnsubscribed && !this.attemptedToUnsubscribePrimary) {
this.attemptedToUnsubscribePrimary = true;
if (this.count === 0) {
super.unsubscribe();
this.primary.unsubscribe();
}
}
}
}

export class GroupedObservable<T> extends Observable<T> {
constructor(public key: string,
private groupSubject: Subject<T>,
private refCountSubscription: RefCountSubscription<T>) {
super();
}

_subscribe(subscriber: Subscriber<T>) {
const subscription = new Subscription();
if (!this.refCountSubscription.isUnsubscribed) {
subscription.add(new InnerRefCountSubscription(this.refCountSubscription));
}
subscription.add(this.groupSubject.subscribe(subscriber));
return subscription;
}
}

export class InnerRefCountSubscription<T> extends Subscription<T> {
constructor(private parent: RefCountSubscription<T>) {
super();
parent.count++;
}

unsubscribe() {
if (!this.parent.isUnsubscribed && !this.isUnsubscribed) {
super.unsubscribe();
this.parent.count--;
if (this.parent.count === 0 && this.parent.attemptedToUnsubscribePrimary) {
this.parent.unsubscribe();
this.parent.primary.unsubscribe();
}
}
}
}

70 changes: 39 additions & 31 deletions src/operators/groupBy.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,52 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscription from '../Subscription';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import Subject from '../Subject';
import Map from '../util/Map';
import FastMap from '../util/FastMap';
import GroupSubject from '../subjects/GroupSubject';
import {RefCountSubscription, GroupedObservable, InnerRefCountSubscription} from './groupBy-support';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function groupBy<T, R>(keySelector: (value: T) => string,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupSubject<R>) => Observable<any>): Observable<GroupSubject<R>> {
return this.lift(new GroupByOperator<T, R>(keySelector, durationSelector, elementSelector));
export function groupBy<T, R>(keySelector: (value: T) => string,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<R>) => Observable<any>): GroupByObservable<T, R> {
return new GroupByObservable<T, R>(this, keySelector, elementSelector, durationSelector);
}

class GroupByOperator<T, R> implements Operator<T, R> {
constructor(private keySelector: (value: T) => string,
private durationSelector?: (grouped: GroupSubject<R>) => Observable<any>,
private elementSelector?: (value: T) => R) {
export class GroupByObservable<T, R> extends Observable<GroupedObservable<R>> {
constructor(public source: Observable<T>,
private keySelector: (value: T) => string,
private elementSelector?: (value: T) => R,
private durationSelector?: (grouped: GroupedObservable<R>) => Observable<any>) {
super();
}

call(subscriber: Subscriber<R>): Subscriber<T> {
return new GroupBySubscriber<T, R>(
subscriber, this.keySelector, this.durationSelector, this.elementSelector
_subscribe(subscriber) {
const refCountSubscription = new RefCountSubscription();
const groupBySubscriber = new GroupBySubscriber(
subscriber, refCountSubscription, this.keySelector, this.elementSelector, this.durationSelector
);
refCountSubscription.setPrimary(this.source.subscribe(groupBySubscriber));
return refCountSubscription;
}
}

class GroupBySubscriber<T, R> extends Subscriber<T> {
private groups = null;

constructor(destination: Subscriber<R>,
private refCountSubscription: RefCountSubscription<T>,
private keySelector: (value: T) => string,
private durationSelector?: (grouped: GroupSubject<R>) => Observable<any>,
private elementSelector?: (value: T) => R) {
super(destination);
private elementSelector?: (value: T) => R,
private durationSelector?: (grouped: GroupedObservable<R>) => Observable<any>) {
super();
this.destination = destination;
this.add(destination);
}

_next(x: T) {
Expand All @@ -53,27 +62,28 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
groups = this.groups = typeof key === 'string' ? new FastMap() : new Map();
}

let group: GroupSubject<R> = groups.get(key);
let group: Subject<R> = groups.get(key);

if (!group) {
groups.set(key, group = new GroupSubject(key));
groups.set(key, group = new Subject());
let groupedObservable = new GroupedObservable<R>(key, group, this.refCountSubscription);

if (durationSelector) {
let duration = tryCatch(durationSelector)(group);
let duration = tryCatch(durationSelector)(groupedObservable);
if (duration === errorObject) {
this.error(duration.e);
} else {
this.add(duration._subscribe(new GroupDurationSubscriber(group, this)));
this.add(duration._subscribe(new GroupDurationSubscriber(key, group, this)));
}
}

this.destination.next(group);
this.destination.next(groupedObservable);
}

if (elementSelector) {
let value = tryCatch(elementSelector)(x);
if (value === errorObject) {
group.error(value.e);
this.error(value.e);
} else {
group.next(value);
}
Expand Down Expand Up @@ -111,26 +121,24 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
}

class GroupDurationSubscriber<T> extends Subscriber<T> {
constructor(private group: GroupSubject<T>,
constructor(private key: string,
private group: Subject<T>,
private parent: GroupBySubscriber<any, T>) {
super(null);
}

_next(value: T) {
const group = this.group;
group.complete();
this.parent.removeGroup(group.key);
this.group.complete();
this.parent.removeGroup(this.key);
}

_error(err: any) {
const group = this.group;
group.error(err);
this.parent.removeGroup(group.key);
this.group.error(err);
this.parent.removeGroup(this.key);
}

_complete() {
const group = this.group;
group.complete();
this.parent.removeGroup(group.key);
this.group.complete();
this.parent.removeGroup(this.key);
}
}
7 changes: 0 additions & 7 deletions src/subjects/GroupSubject.ts

This file was deleted.

0 comments on commit 86992c6

Please sign in to comment.