Skip to content

Commit 34c05fe

Browse files
mattpodwysockibenlesh
authored andcommitted
feat(AsyncSubject): add AsyncSubject
1 parent a20325c commit 34c05fe

File tree

3 files changed

+57
-0
lines changed

3 files changed

+57
-0
lines changed

src/Rx.KitchenSink.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ import './operator/zipAll';
128128
import {Subject} from './Subject';
129129
import {Subscription} from './Subscription';
130130
import {Subscriber} from './Subscriber';
131+
import {AsyncSubject} from './subject/AsyncSubject';
131132
import {ReplaySubject} from './subject/ReplaySubject';
132133
import {BehaviorSubject} from './subject/BehaviorSubject';
133134
import {ConnectableObservable} from './observable/ConnectableObservable';
@@ -156,6 +157,7 @@ export {
156157
Observable,
157158
Subscriber,
158159
Subscription,
160+
AsyncSubject,
159161
ReplaySubject,
160162
BehaviorSubject,
161163
ConnectableObservable,

src/Rx.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ import './operator/zipAll';
104104
import {Subject} from './Subject';
105105
import {Subscription} from './Subscription';
106106
import {Subscriber} from './Subscriber';
107+
import {AsyncSubject} from './subject/AsyncSubject';
107108
import {ReplaySubject} from './subject/ReplaySubject';
108109
import {BehaviorSubject} from './subject/BehaviorSubject';
109110
import {ConnectableObservable} from './observable/ConnectableObservable';
@@ -129,6 +130,7 @@ export {
129130
Observable,
130131
Subscriber,
131132
Subscription,
133+
AsyncSubject,
132134
ReplaySubject,
133135
BehaviorSubject,
134136
ConnectableObservable,

src/subjects/AsyncSubject.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import {Subject} from '../Subject';
2+
import {Subscriber} from '../Subscriber';
3+
import {Subscription} from '../Subscription';
4+
5+
export class AsyncSubject<T> extends Subject<T> {
6+
_value: T = void 0;
7+
_hasNext: boolean = false;
8+
_isScalar: boolean = false;
9+
10+
constructor () {
11+
super();
12+
}
13+
14+
_subscribe(subscriber: Subscriber<any>): Subscription<T> {
15+
const subscription = super._subscribe(subscriber);
16+
if (!subscription) {
17+
return;
18+
} else if (!subscription.isUnsubscribed && this._hasNext) {
19+
subscriber.next(this._value);
20+
subscriber.complete();
21+
}
22+
return subscription;
23+
}
24+
25+
_next(value: T): void {
26+
this._value = value;
27+
this._hasNext = true;
28+
}
29+
30+
_complete(): void {
31+
let index = -1;
32+
const observers = this.observers;
33+
const len = observers.length;
34+
35+
// optimization -- block next, complete, and unsubscribe while dispatching
36+
this.observers = void 0; // optimization
37+
this.isUnsubscribed = true;
38+
39+
if (this._hasNext) {
40+
while (++index < len) {
41+
let o = observers[index];
42+
o.next(this._value);
43+
o.complete();
44+
}
45+
} else {
46+
while (++index < len) {
47+
observers[index].complete();
48+
}
49+
}
50+
51+
this.isUnsubscribed = false;
52+
}
53+
}

0 commit comments

Comments
 (0)