This repository has been archived by the owner on Apr 20, 2018. It is now read-only.
/
subject.js
120 lines (113 loc) · 4.25 KB
/
subject.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/**
* Represents an object that is both an observable sequence as well as an observer.
* Each notification is broadcasted to all subscribed observers.
*/
var Subject = Rx.Subject = (function (_super) {
function subscribe(observer) {
checkDisposed.call(this);
if (!this.isStopped) {
this.observers.push(observer);
return new InnerSubscription(this, observer);
}
if (this.exception) {
observer.onError(this.exception);
return disposableEmpty;
}
observer.onCompleted();
return disposableEmpty;
}
inherits(Subject, _super);
/**
* Creates a subject.
*
* @constructor
*/
function Subject() {
_super.call(this, subscribe);
this.isDisposed = false,
this.isStopped = false,
this.observers = [];
}
addProperties(Subject.prototype, Observer, {
/**
* Indicates whether the subject has observers subscribed to it.
*
* @memberOf ReplaySubject#
* @returns {Boolean} Indicates whether the subject has observers subscribed to it.
*/
hasObservers: function () {
return this.observers.length > 0;
},
/**
* Notifies all subscribed observers about the end of the sequence.
*
* @memberOf ReplaySubject#
*/
onCompleted: function () {
checkDisposed.call(this);
if (!this.isStopped) {
var os = this.observers.slice(0);
this.isStopped = true;
for (var i = 0, len = os.length; i < len; i++) {
os[i].onCompleted();
}
this.observers = [];
}
},
/**
* Notifies all subscribed observers about the exception.
*
* @memberOf ReplaySubject#
* @param {Mixed} error The exception to send to all observers.
*/
onError: function (exception) {
checkDisposed.call(this);
if (!this.isStopped) {
var os = this.observers.slice(0);
this.isStopped = true;
this.exception = exception;
for (var i = 0, len = os.length; i < len; i++) {
os[i].onError(exception);
}
this.observers = [];
}
},
/**
* Notifies all subscribed observers about the arrival of the specified element in the sequence.
*
* @memberOf ReplaySubject#
* @param {Mixed} value The value to send to all observers.
*/
onNext: function (value) {
checkDisposed.call(this);
if (!this.isStopped) {
var os = this.observers.slice(0);
for (var i = 0, len = os.length; i < len; i++) {
os[i].onNext(value);
}
}
},
/**
* Unsubscribe all observers and release resources.
*
* @memberOf ReplaySubject#
*/
dispose: function () {
this.isDisposed = true;
this.observers = null;
}
});
/**
* Creates a subject from the specified observer and observable.
*
* @static
* @memberOf Subject
* @param {Observer} observer The observer used to send messages to the subject.
* @param {Observable} observable The observable used to subscribe to messages sent from the subject.
* @returns {Subject} Subject implemented using the given observer and observable.
*/
Subject.create = function (observer, observable) {
return new AnonymousSubject(observer, observable);
};
return Subject;
}(Observable));