diff --git a/spec/operators/cache-spec.js b/spec/operators/cache-spec.js new file mode 100644 index 0000000000..d464ee9f64 --- /dev/null +++ b/spec/operators/cache-spec.js @@ -0,0 +1,182 @@ +/* globals describe, it, expect, expectObservable, hot, rxTestScheduler, time */ +var Rx = require('../../dist/cjs/Rx'); + +var asap = Rx.Scheduler.asap; +var Observable = Rx.Observable; + +describe('Observable.prototype.cache', function () { + it('should replay values upon subscription', function () { + var s1 = hot('---^---a---b---c---| ').cache(); + var expected1 = '----a---b---c---| '; + var expected2 = ' (abc|)'; + var t = time( '----------------|'); + + expectObservable(s1).toBe(expected1); + + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected2); + }, t); + }); + + it('should replay values and error', function () { + var s1 = hot('---^---a---b---c---# ').cache(); + var expected1 = '----a---b---c---# '; + var expected2 = ' (abc#)'; + var t = time( '----------------|'); + + expectObservable(s1).toBe(expected1); + + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected2); + }, t); + }); + + it('should replay values and and share', function () { + var s1 = hot('---^---a---b---c------------d--e--f-|').cache(); + var expected1 = '----a---b---c------------d--e--f-|'; + var expected2 = ' (abc)----d--e--f-|'; + var t = time( '----------------|'); + + expectObservable(s1).toBe(expected1); + + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected2); + }, t); + }); + + it('should have a bufferCount that limits the replay test 1', function () { + var s1 = hot('---^---a---b---c------------d--e--f-|').cache(1); + var expected1 = '----a---b---c------------d--e--f-|'; + var expected2 = ' c--------d--e--f-|'; + var t = time( '----------------|'); + + expectObservable(s1).toBe(expected1); + + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected2); + }, t); + }); + + it('should have a bufferCount that limits the replay test 2', function () { + var s1 = hot('---^---a---b---c------------d--e--f-|').cache(2); + var expected1 = '----a---b---c------------d--e--f-|'; + var expected2 = ' (bc)-----d--e--f-|'; + var t = time( '----------------|'); + + expectObservable(s1).toBe(expected1); + + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected2); + }, t); + }); + + it('should accept a windowTime that limits the replay', function () { + var w = time( '----------|'); + var s1 = hot('---^---a---b---c------------d--e--f-|').cache(Number.POSITIVE_INFINITY, w, rxTestScheduler); + var expected1 = '----a---b---c------------d--e--f-|'; + var expected2 = ' (bc)-----d--e--f-|'; + var t = time( '----------------|'); + + expectObservable(s1).toBe(expected1); + + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected2); + }, t); + }); + + it('should handle empty', function () { + var s1 = cold('|').cache(); + var expected1 = '|'; + var expected2 = ' |'; + var t = time( '----------------|'); + + expectObservable(s1).toBe(expected1); + + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected2); + }, t); + }); + + it('should handle throw', function () { + var s1 = cold('#').cache(); + var expected1 = '#'; + var expected2 = ' #'; + var t = time( '----------------|'); + + expectObservable(s1).toBe(expected1); + + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected2); + }, t); + }); + + it('should handle never', function () { + var s1 = cold('-').cache(); + var expected1 = '-'; + var expected2 = ' -'; + var t = time( '----------------|'); + + expectObservable(s1).toBe(expected1); + + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected2); + }, t); + }); + + it('should multicast a completion', function () { + var s1 = hot('--a--^--b------c-----d------e-|').cache(); + var t1 = time( '| '); + var e1 = '---b------c-----d------e-|'; + var t2 = time( '----------| '); + var e2 = ' (bc)--d------e-|'; + var t3 = time( '----------------| '); + var e3 = ' (bcd)--e-|'; + + var expected = [e1, e2, e3]; + + [t1, t2, t3].forEach(function (t, i) { + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected[i]); + }, t); + }); + }); + + it('should multicast an error', function () { + var s1 = hot('--a--^--b------c-----d------e-#').cache(); + var t1 = time( '| '); + var e1 = '---b------c-----d------e-#'; + var t2 = time( '----------| '); + var e2 = ' (bc)--d------e-#'; + var t3 = time( '----------------| '); + var e3 = ' (bcd)--e-#'; + + var expected = [e1, e2, e3]; + + [t1, t2, t3].forEach(function (t, i) { + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(expected[i]); + }, t); + }); + }); + + it('should limit replay by both count and a window time, test 2', function () { + var w = time( '-----------|'); + var s1 = hot('--a--^---b---c----d----e------f--g--h--i-------|').cache(4, w, rxTestScheduler); + var e1 = '----b---c----d----e------f--g--h--i-------|'; + var t1 = time( '--------------------|'); + // -----------| + var e2 = ' (de)-f--g--h--i-------|'; // time wins + var t2 = time( '-----------------------------------|'); + var e3 = ' (fghi)-|'; // count wins + // -----------| + + expectObservable(s1).toBe(e1); + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(e2); + }, t1); + + rxTestScheduler.schedule(function () { + expectObservable(s1).toBe(e3); + }, t2); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index f8d01d5b54..a31a6a4469 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -210,6 +210,7 @@ export class Observable implements CoreOperators { bufferTime: BufferTimeSignature; bufferToggle: BufferToggleSignature; bufferWhen: BufferWhenSignature; + cache: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => Observable; catch: (selector: (err: any, source: Observable, caught: Observable) => Observable) => Observable; combineAll: (project?: (...values: Array) => R) => Observable; combineLatest: CombineLatestSignature; diff --git a/src/Rx.ts b/src/Rx.ts index 6507158e97..15ec5b5488 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -35,6 +35,7 @@ import './add/operator/bufferCount'; import './add/operator/bufferTime'; import './add/operator/bufferToggle'; import './add/operator/bufferWhen'; +import './add/operator/cache'; import './add/operator/catch'; import './add/operator/combineAll'; import './add/operator/combineLatest'; diff --git a/src/add/operator/cache.ts b/src/add/operator/cache.ts new file mode 100644 index 0000000000..f224b533f6 --- /dev/null +++ b/src/add/operator/cache.ts @@ -0,0 +1,7 @@ + +import {Observable} from '../../Observable'; +import {cache} from '../../operator/cache'; + +Observable.prototype.cache = cache; + +export var _void: void; \ No newline at end of file diff --git a/src/operator/cache.ts b/src/operator/cache.ts new file mode 100644 index 0000000000..9ef1a03cbf --- /dev/null +++ b/src/operator/cache.ts @@ -0,0 +1,10 @@ +import {Observable} from '../Observable'; +import {publishReplay} from './publishReplay'; +import {Scheduler} from '../Scheduler'; +import {ConnectableObservable} from '../observable/ConnectableObservable'; + +export function cache(bufferSize: number = Number.POSITIVE_INFINITY, + windowTime: number = Number.POSITIVE_INFINITY, + scheduler?: Scheduler): Observable { + return (>publishReplay.call(this, bufferSize, windowTime, scheduler)).refCount(); +} \ No newline at end of file diff --git a/src/subject/ReplaySubject.ts b/src/subject/ReplaySubject.ts index 2c329e1c31..f50cda0ff6 100644 --- a/src/subject/ReplaySubject.ts +++ b/src/subject/ReplaySubject.ts @@ -9,15 +9,15 @@ export class ReplaySubject extends Subject { private events: ReplayEvent[] = []; private scheduler: Scheduler; private bufferSize: number; - private windowSize: number; + private _windowTime: number; constructor(bufferSize: number = Number.POSITIVE_INFINITY, - windowSize: number = Number.POSITIVE_INFINITY, + windowTime: number = Number.POSITIVE_INFINITY, scheduler?: Scheduler) { super(); this.scheduler = scheduler; this.bufferSize = bufferSize < 1 ? 1 : bufferSize; - this.windowSize = windowSize < 1 ? 1 : windowSize; + this._windowTime = windowTime < 1 ? 1 : windowTime; } protected _next(value: T): void { @@ -49,7 +49,7 @@ export class ReplaySubject extends Subject { private _trimBufferThenGetEvents(now: number): ReplayEvent[] { const bufferSize = this.bufferSize; - const windowSize = this.windowSize; + const _windowTime = this._windowTime; const events = this.events; let eventsCount = events.length; @@ -59,7 +59,7 @@ export class ReplaySubject extends Subject { // Start at the front of the list. Break early once // we encounter an event that falls within the window. while (spliceCount < eventsCount) { - if ((now - events[spliceCount].time) < windowSize) { + if ((now - events[spliceCount].time) < _windowTime) { break; } spliceCount += 1;