Skip to content

Commit ba9ef2b

Browse files
committed
fix(Subject): correct Subject behaviors to be more like Rx4
BREAKING CHANGE: Subjects no longer duck-type as Subscriptions BREAKING CHANGE: Subjects will no longer throw when re-subscribed to if they are not unsubscribed BREAKING CHANGE: Subjects no longer automatically unsubscribe when completed or errored BREAKING CAHNGE: Minor scheduling changes to groupBy to ensure proper emission ordering
1 parent 08f3d38 commit ba9ef2b

21 files changed

+482
-813
lines changed

spec/Subject-spec.ts

Lines changed: 35 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,6 @@ describe('Subject', () => {
2121
subject.complete();
2222
});
2323

24-
it('should have the rxSubscriber Symbol', () => {
25-
const subject = new Subject();
26-
expect(subject[Rx.Symbol.rxSubscriber]).to.be.a('function');
27-
});
28-
2924
it('should pump values to multiple subscribers', (done: MochaDone) => {
3025
const subject = new Subject();
3126
const expected = ['foo', 'bar'];
@@ -276,82 +271,7 @@ describe('Subject', () => {
276271
expect(results3).to.deep.equal([]);
277272
});
278273

279-
it('should allow ad-hoc subscription to be added to itself', () => {
280-
const subject = new Subject();
281-
const results1 = [];
282-
const results2 = [];
283-
284-
const auxSubject = new Subject();
285-
286-
const subscription1 = subject.subscribe(
287-
function (x) { results1.push(x); },
288-
function (e) { results1.push('E'); },
289-
() => { results1.push('C'); }
290-
);
291-
const subscription2 = auxSubject.subscribe(
292-
function (x) { results2.push(x); },
293-
function (e) { results2.push('E'); },
294-
() => { results2.push('C'); }
295-
);
296-
297-
subject.add(subscription2);
298-
299-
subject.next(1);
300-
subject.next(2);
301-
subject.next(3);
302-
auxSubject.next('a');
303-
auxSubject.next('b');
304-
305-
subscription1.unsubscribe();
306-
subject.unsubscribe();
307-
308-
auxSubject.next('c');
309-
auxSubject.next('d');
310-
311-
expect(results1).to.deep.equal([1, 2, 3]);
312-
expect(subscription2.isUnsubscribed).to.be.true;
313-
expect(results2).to.deep.equal(['a', 'b']);
314-
});
315-
316-
it('should allow ad-hoc subscription to be removed from itself', () => {
317-
const subject = new Subject();
318-
const results1 = [];
319-
const results2 = [];
320-
321-
const auxSubject = new Subject();
322-
323-
const subscription1 = subject.subscribe(
324-
function (x) { results1.push(x); },
325-
function (e) { results1.push('E'); },
326-
() => { results1.push('C'); }
327-
);
328-
const subscription2 = auxSubject.subscribe(
329-
function (x) { results2.push(x); },
330-
function (e) { results2.push('E'); },
331-
() => { results2.push('C'); }
332-
);
333-
334-
subject.add(subscription2);
335-
336-
subject.next(1);
337-
subject.next(2);
338-
subject.next(3);
339-
auxSubject.next('a');
340-
auxSubject.next('b');
341-
342-
subject.remove(subscription2);
343-
subscription1.unsubscribe();
344-
subject.unsubscribe();
345-
346-
auxSubject.next('c');
347-
auxSubject.next('d');
348-
349-
expect(results1).to.deep.equal([1, 2, 3]);
350-
expect(subscription2.isUnsubscribed).to.be.false;
351-
expect(results2).to.deep.equal(['a', 'b', 'c', 'd']);
352-
});
353-
354-
it('should not allow values to be nexted after a return', (done: MochaDone) => {
274+
it('should not allow values to be nexted after it is unsubscribed', (done: MochaDone) => {
355275
const subject = new Subject();
356276
const expected = ['foo'];
357277

@@ -360,7 +280,7 @@ describe('Subject', () => {
360280
});
361281

362282
subject.next('foo');
363-
subject.complete();
283+
subject.unsubscribe();
364284
expect(() => subject.next('bar')).to.throw(Rx.ObjectUnsubscribedError);
365285
done();
366286
});
@@ -528,38 +448,24 @@ describe('Subject', () => {
528448
}).to.throw(Rx.ObjectUnsubscribedError);
529449
});
530450

531-
it('should throw ObjectUnsubscribedError when emit after completed', () => {
451+
it('should not next after completed', () => {
532452
const subject = new Rx.Subject();
453+
const results = [];
454+
subject.subscribe(x => results.push(x), null, () => results.push('C'));
455+
subject.next('a');
533456
subject.complete();
534-
535-
expect(() => {
536-
subject.next('a');
537-
}).to.throw(Rx.ObjectUnsubscribedError);
538-
539-
expect(() => {
540-
subject.error('a');
541-
}).to.throw(Rx.ObjectUnsubscribedError);
542-
543-
expect(() => {
544-
subject.complete();
545-
}).to.throw(Rx.ObjectUnsubscribedError);
457+
subject.next('b');
458+
expect(results).to.deep.equal(['a', 'C']);
546459
});
547460

548-
it('should throw ObjectUnsubscribedError when emit after error', () => {
461+
it('should not next after error', () => {
549462
const subject = new Rx.Subject();
550-
subject.error('e');
551-
552-
expect(() => {
553-
subject.next('a');
554-
}).to.throw(Rx.ObjectUnsubscribedError);
555-
556-
expect(() => {
557-
subject.error('a');
558-
}).to.throw(Rx.ObjectUnsubscribedError);
559-
560-
expect(() => {
561-
subject.complete();
562-
}).to.throw(Rx.ObjectUnsubscribedError);
463+
const results = [];
464+
subject.subscribe(x => results.push(x), (err) => results.push(err));
465+
subject.next('a');
466+
subject.error(new Error('wut?'));
467+
subject.next('b');
468+
expect(results).to.deep.equal(['a', new Error('wut?')]);
563469
});
564470

565471
describe('asObservable', () => {
@@ -600,43 +506,38 @@ describe('Subject', () => {
600506
expectObservable(observable).toBe(expected);
601507
});
602508

603-
it('should work with inherited subject', (done: MochaDone) => {
509+
it('should work with inherited subject', () => {
510+
const results = [];
604511
const subject = new Rx.AsyncSubject();
605512

606513
subject.next(42);
607514
subject.complete();
608515

609516
const observable = subject.asObservable();
610517

611-
const expected = [new Rx.Notification('N', 42),
612-
new Rx.Notification('C')];
518+
observable.subscribe(x => results.push(x), null, () => results.push('done'));
613519

614-
observable.materialize().subscribe((x: Rx.Notification<number>) => {
615-
expect(x).to.deep.equal(expected.shift());
616-
}, (err: any) => {
617-
done(err);
618-
}, () => {
619-
expect(expected).to.deep.equal([]);
620-
done();
621-
});
520+
expect(results).to.deep.equal([42, 'done']);
622521
});
522+
});
523+
});
623524

624-
it('should not eager', () => {
625-
let subscribed = false;
525+
describe('AnonymousSubject', () => {
526+
it('should not eager', () => {
527+
let subscribed = false;
626528

627-
const subject = new Rx.Subject(null, new Rx.Observable((observer: Rx.Observer<any>) => {
628-
subscribed = true;
629-
const subscription = Rx.Observable.of('x').subscribe(observer);
630-
return () => {
631-
subscription.unsubscribe();
632-
};
633-
}));
529+
const subject = Rx.Subject.create(null, new Rx.Observable((observer: Rx.Observer<any>) => {
530+
subscribed = true;
531+
const subscription = Rx.Observable.of('x').subscribe(observer);
532+
return () => {
533+
subscription.unsubscribe();
534+
};
535+
}));
634536

635-
const observable = subject.asObservable();
636-
expect(subscribed).to.be.false;
537+
const observable = subject.asObservable();
538+
expect(subscribed).to.be.false;
637539

638-
observable.subscribe();
639-
expect(subscribed).to.be.true;
640-
});
540+
observable.subscribe();
541+
expect(subscribed).to.be.true;
641542
});
642543
});

spec/Subscriber-spec.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,6 @@ const Subscriber = Rx.Subscriber;
66

77
/** @test {Subscriber} */
88
describe('Subscriber', () => {
9-
it('should have the rxSubscriber symbol', () => {
10-
const sub = new Subscriber();
11-
expect(sub[Rx.Symbol.rxSubscriber]()).to.equal(sub);
12-
});
13-
149
describe('when created through create()', () => {
1510
it('should not call error() if next() handler throws an error', () => {
1611
const errorSpy = sinon.spy();

spec/operators/cache-spec.ts

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,45 @@
11
import * as Rx from '../../dist/cjs/Rx';
2+
import {expect} from 'chai';
23
declare const {hot, cold, time, expectObservable};
34

45
declare const rxTestScheduler: Rx.TestScheduler;
56

67
/** @test {cache} */
78
describe('Observable.prototype.cache', () => {
9+
it('should just work™', () => {
10+
let subs = 0;
11+
const source = Rx.Observable.create(observer => {
12+
subs++;
13+
observer.next(1);
14+
observer.next(2);
15+
observer.next(3);
16+
observer.complete();
17+
}).cache();
18+
let results = [];
19+
source.subscribe(x => results.push(x));
20+
expect(results).to.deep.equal([1, 2, 3]);
21+
expect(subs).to.equal(1);
22+
results = [];
23+
source.subscribe(x => results.push(x));
24+
expect(results).to.deep.equal([1, 2, 3]);
25+
expect(subs).to.equal(1);
26+
});
27+
828
it('should replay values upon subscription', () => {
9-
const s1 = hot('---^---a---b---c---| ').cache();
10-
const expected1 = '----a---b---c---| ';
11-
const expected2 = ' (abc|)';
12-
const t = time( '----------------|');
29+
const s1 = hot( '----a---b---c---| ').cache(undefined, undefined, rxTestScheduler);
30+
const expected1 = '----a---b---c---| ';
31+
const expected2 = ' (abc|)';
32+
const sub2 = '------------------| ';
1333

1434
expectObservable(s1).toBe(expected1);
15-
16-
rxTestScheduler.schedule(() => {
17-
expectObservable(s1).toBe(expected2);
18-
}, t);
35+
rxTestScheduler.schedule(() => expectObservable(s1).toBe(expected2), time(sub2));
1936
});
2037

2138
it('should replay values and error', () => {
22-
const s1 = hot('---^---a---b---c---# ').cache();
39+
const s1 = hot('---^---a---b---c---# ').cache(undefined, undefined, rxTestScheduler);
2340
const expected1 = '----a---b---c---# ';
24-
const expected2 = ' (abc#)';
25-
const t = time( '----------------|');
41+
const expected2 = ' (abc#)';
42+
const t = time( '------------------|');
2643

2744
expectObservable(s1).toBe(expected1);
2845

@@ -32,7 +49,7 @@ describe('Observable.prototype.cache', () => {
3249
});
3350

3451
it('should replay values and and share', () => {
35-
const s1 = hot('---^---a---b---c------------d--e--f-|').cache();
52+
const s1 = hot('---^---a---b---c------------d--e--f-|').cache(undefined, undefined, rxTestScheduler);
3653
const expected1 = '----a---b---c------------d--e--f-|';
3754
const expected2 = ' (abc)----d--e--f-|';
3855
const t = time( '----------------|');
@@ -58,16 +75,13 @@ describe('Observable.prototype.cache', () => {
5875
});
5976

6077
it('should have a bufferCount that limits the replay test 2', () => {
61-
const s1 = hot('---^---a---b---c------------d--e--f-|').cache(2);
78+
const s1 = hot( '----a---b---c------------d--e--f-|').cache(2);
6279
const expected1 = '----a---b---c------------d--e--f-|';
6380
const expected2 = ' (bc)-----d--e--f-|';
6481
const t = time( '----------------|');
6582

6683
expectObservable(s1).toBe(expected1);
67-
68-
rxTestScheduler.schedule(() => {
69-
expectObservable(s1).toBe(expected2);
70-
}, t);
84+
rxTestScheduler.schedule(() => expectObservable(s1).toBe(expected2), t);
7185
});
7286

7387
it('should accept a windowTime that limits the replay', () => {
@@ -85,7 +99,7 @@ describe('Observable.prototype.cache', () => {
8599
});
86100

87101
it('should handle empty', () => {
88-
const s1 = cold('|').cache();
102+
const s1 = cold('|').cache(undefined, undefined, rxTestScheduler);
89103
const expected1 = '|';
90104
const expected2 = ' |';
91105
const t = time( '----------------|');
@@ -98,7 +112,7 @@ describe('Observable.prototype.cache', () => {
98112
});
99113

100114
it('should handle throw', () => {
101-
const s1 = cold('#').cache();
115+
const s1 = cold('#').cache(undefined, undefined, rxTestScheduler);
102116
const expected1 = '#';
103117
const expected2 = ' #';
104118
const t = time( '----------------|');
@@ -111,7 +125,7 @@ describe('Observable.prototype.cache', () => {
111125
});
112126

113127
it('should handle never', () => {
114-
const s1 = cold('-').cache();
128+
const s1 = cold('-').cache(undefined, undefined, rxTestScheduler);
115129
const expected1 = '-';
116130
const expected2 = ' -';
117131
const t = time( '----------------|');
@@ -124,7 +138,7 @@ describe('Observable.prototype.cache', () => {
124138
});
125139

126140
it('should multicast a completion', () => {
127-
const s1 = hot('--a--^--b------c-----d------e-|').cache();
141+
const s1 = hot('--a--^--b------c-----d------e-|').cache(undefined, undefined, rxTestScheduler);
128142
const t1 = time( '| ');
129143
const e1 = '---b------c-----d------e-|';
130144
const t2 = time( '----------| ');
@@ -142,7 +156,7 @@ describe('Observable.prototype.cache', () => {
142156
});
143157

144158
it('should multicast an error', () => {
145-
const s1 = hot('--a--^--b------c-----d------e-#').cache();
159+
const s1 = hot('--a--^--b------c-----d------e-#').cache(undefined, undefined, rxTestScheduler);
146160
const t1 = time( '| ');
147161
const e1 = '---b------c-----d------e-#';
148162
const t2 = time( '----------| ');

0 commit comments

Comments
 (0)