Skip to content

Commit 65c84ea

Browse files
author
Andre Medeiros
committed
feat(shareReplay): add the shareReplay() operator
Add shareReplay() as a core operator. Vaguely relates to issue #439 and issue #298.
1 parent af603bd commit 65c84ea

File tree

6 files changed

+123
-2
lines changed

6 files changed

+123
-2
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
var RxOld = require('rx');
2+
var RxNew = require('../../../../index');
3+
4+
module.exports = function (suite) {
5+
var oldShareReplayWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate)
6+
.shareReplay(3);
7+
var newShareReplayWithImmediateScheduler = RxNew.Observable.range(0, 25)
8+
.shareReplay(3);
9+
10+
function _next(x) { }
11+
function _error(e) { }
12+
function _complete() { }
13+
return suite
14+
.add('old shareReplay with immediate scheduler', function () {
15+
oldShareReplayWithImmediateScheduler.subscribe(_next, _error, _complete);
16+
})
17+
.add('new shareReplay with immediate scheduler', function () {
18+
newShareReplayWithImmediateScheduler.subscribe(_next, _error, _complete);
19+
});
20+
};

spec/operators/shareReplay-spec.js

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/* globals describe, expect, it, hot, cold, expectObservable */
2+
3+
var Rx = require('../../dist/cjs/Rx');
4+
var Observable = Rx.Observable;
5+
6+
describe('Observable.prototype.shareReplay()', function () {
7+
it('should share a single subscription', function () {
8+
var subscriptionCount = 0;
9+
var obs = new Observable(function (observer) {
10+
subscriptionCount++;
11+
});
12+
13+
var source = obs.shareReplay(1);
14+
15+
expect(subscriptionCount).toBe(0);
16+
17+
source.subscribe();
18+
source.subscribe();
19+
20+
expect(subscriptionCount).toBe(1);
21+
});
22+
23+
it('should replay as many events as specified by the bufferSize', function (done) {
24+
var results1 = [];
25+
var results2 = [];
26+
var subscriptions = 0;
27+
28+
var source = new Observable(function (observer) {
29+
subscriptions++;
30+
observer.next(1);
31+
observer.next(2);
32+
observer.next(3);
33+
observer.next(4);
34+
});
35+
36+
var hot = source.shareReplay(2);
37+
38+
expect(results1).toEqual([]);
39+
expect(results2).toEqual([]);
40+
41+
hot.subscribe(function (x) {
42+
results1.push(x);
43+
});
44+
45+
expect(results1).toEqual([1, 2, 3, 4]);
46+
expect(results2).toEqual([]);
47+
48+
hot.subscribe(function (x) {
49+
results2.push(x);
50+
});
51+
52+
expect(results1).toEqual([1, 2, 3, 4]);
53+
expect(results2).toEqual([3, 4]);
54+
expect(subscriptions).toBe(1);
55+
done();
56+
});
57+
58+
it('should not change the output of the observable when successful', function () {
59+
var e1 = hot('---a--^--b-c--d--e--|');
60+
var expected = '---b-c--d--e--|';
61+
62+
expectObservable(e1.shareReplay(1)).toBe(expected);
63+
});
64+
65+
it('should not change the output of the observable when error', function () {
66+
var e1 = hot('---a--^--b-c--d--e--#');
67+
var expected = '---b-c--d--e--#';
68+
69+
expectObservable(e1.shareReplay(1)).toBe(expected);
70+
});
71+
72+
it('should not change the output of the observable when never', function () {
73+
var e1 = Observable.never();
74+
var expected = '-';
75+
76+
expectObservable(e1.shareReplay(1)).toBe(expected);
77+
});
78+
79+
it('should not change the output of the observable when empty', function () {
80+
var e1 = Observable.empty();
81+
var expected = '|';
82+
83+
expectObservable(e1.shareReplay(1)).toBe(expected);
84+
});
85+
});

src/CoreOperators.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export interface CoreOperators<T> {
5454
sampleTime?: <T>(delay: number, scheduler?: Scheduler) => Observable<T>;
5555
scan?: <R>(project: (acc: R, x: T) => R, acc?: R) => Observable<R>;
5656
share?: () => Observable<T>;
57+
shareReplay?: (bufferSize: number, windowTime: number, scheduler?: Scheduler) => Observable<T>;
5758
single?: (predicate?: (value: T, index:number) => boolean, thisArg?: any) => Observable<T>;
5859
skip?: (count: number) => Observable<T>;
5960
skipUntil?: (notifier: Observable<any>) => Observable<T>;

src/Rx.KitchenSink.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ Observable.zip = zipStatic
6666

6767

6868

69-
// Operators
69+
// Operators
7070
const observableProto = (<KitchenSinkOperators<any>>Observable.prototype);
7171

7272
import buffer from './operators/buffer';
@@ -227,6 +227,9 @@ observableProto.scan = scan;
227227
import share from './operators/share';
228228
observableProto.share = share;
229229

230+
import shareReplay from './operators/shareReplay';
231+
observableProto.shareReplay = shareReplay;
232+
230233
import single from './operators/single';
231234
observableProto.single = single;
232235

src/Rx.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ Observable.zip = zipStatic
5656

5757

5858

59-
// Operators
59+
// Operators
6060
import { CoreOperators } from './CoreOperators';
6161
const observableProto = (<CoreOperators<any>>Observable.prototype);
6262

@@ -203,6 +203,9 @@ observableProto.scan = scan;
203203
import share from './operators/share';
204204
observableProto.share = share;
205205

206+
import shareReplay from './operators/shareReplay';
207+
observableProto.shareReplay = shareReplay;
208+
206209
import single from './operators/single';
207210
observableProto.single = single;
208211

src/operators/shareReplay.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import Observable from '../Observable';
2+
import Scheduler from '../Scheduler';
3+
import publishReplay from './publishReplay';
4+
5+
export default function shareReplay<T>(bufferSize: number = Number.POSITIVE_INFINITY,
6+
windowTime: number = Number.POSITIVE_INFINITY,
7+
scheduler?: Scheduler) {
8+
return publishReplay.call(this, bufferSize, windowTime, scheduler).refCount();
9+
}

0 commit comments

Comments
 (0)