Skip to content

Commit 0377ca6

Browse files
committed
feat(bufferTime): add higher-order lettable version of bufferTime operator
1 parent 0ae2ed5 commit 0377ca6

File tree

3 files changed

+240
-160
lines changed

3 files changed

+240
-160
lines changed

src/operator/bufferTime.ts

Lines changed: 2 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
import { IScheduler } from '../Scheduler';
2-
import { Action } from '../scheduler/Action';
3-
import { Operator } from '../Operator';
42
import { async } from '../scheduler/async';
53
import { Observable } from '../Observable';
6-
import { Subscriber } from '../Subscriber';
7-
import { Subscription } from '../Subscription';
84
import { isScheduler } from '../util/isScheduler';
5+
import { bufferTime as higherOrder } from '../operators';
96

107
/* tslint:disable:max-line-length */
118
export function bufferTime<T>(this: Observable<T>, bufferTimeSpan: number, scheduler?: IScheduler): Observable<T[]>;
@@ -75,160 +72,5 @@ export function bufferTime<T>(this: Observable<T>, bufferTimeSpan: number): Obse
7572
maxBufferSize = arguments[2];
7673
}
7774

78-
return this.lift(new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
79-
}
80-
81-
class BufferTimeOperator<T> implements Operator<T, T[]> {
82-
constructor(private bufferTimeSpan: number,
83-
private bufferCreationInterval: number,
84-
private maxBufferSize: number,
85-
private scheduler: IScheduler) {
86-
}
87-
88-
call(subscriber: Subscriber<T[]>, source: any): any {
89-
return source.subscribe(new BufferTimeSubscriber(
90-
subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler
91-
));
92-
}
93-
}
94-
95-
class Context<T> {
96-
buffer: T[] = [];
97-
closeAction: Subscription;
98-
}
99-
100-
type CreationState<T> = {
101-
bufferTimeSpan: number;
102-
bufferCreationInterval: number,
103-
subscriber: BufferTimeSubscriber<T>;
104-
scheduler: IScheduler;
105-
};
106-
107-
/**
108-
* We need this JSDoc comment for affecting ESDoc.
109-
* @ignore
110-
* @extends {Ignored}
111-
*/
112-
class BufferTimeSubscriber<T> extends Subscriber<T> {
113-
private contexts: Array<Context<T>> = [];
114-
private timespanOnly: boolean;
115-
116-
constructor(destination: Subscriber<T[]>,
117-
private bufferTimeSpan: number,
118-
private bufferCreationInterval: number,
119-
private maxBufferSize: number,
120-
private scheduler: IScheduler) {
121-
super(destination);
122-
const context = this.openContext();
123-
this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
124-
if (this.timespanOnly) {
125-
const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
126-
this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
127-
} else {
128-
const closeState = { subscriber: this, context };
129-
const creationState: CreationState<T> = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler };
130-
this.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState));
131-
this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, creationState));
132-
}
133-
}
134-
135-
protected _next(value: T) {
136-
const contexts = this.contexts;
137-
const len = contexts.length;
138-
let filledBufferContext: Context<T>;
139-
for (let i = 0; i < len; i++) {
140-
const context = contexts[i];
141-
const buffer = context.buffer;
142-
buffer.push(value);
143-
if (buffer.length == this.maxBufferSize) {
144-
filledBufferContext = context;
145-
}
146-
}
147-
148-
if (filledBufferContext) {
149-
this.onBufferFull(filledBufferContext);
150-
}
151-
}
152-
153-
protected _error(err: any) {
154-
this.contexts.length = 0;
155-
super._error(err);
156-
}
157-
158-
protected _complete() {
159-
const { contexts, destination } = this;
160-
while (contexts.length > 0) {
161-
const context = contexts.shift();
162-
destination.next(context.buffer);
163-
}
164-
super._complete();
165-
}
166-
167-
protected _unsubscribe() {
168-
this.contexts = null;
169-
}
170-
171-
protected onBufferFull(context: Context<T>) {
172-
this.closeContext(context);
173-
const closeAction = context.closeAction;
174-
closeAction.unsubscribe();
175-
this.remove(closeAction);
176-
177-
if (!this.closed && this.timespanOnly) {
178-
context = this.openContext();
179-
const bufferTimeSpan = this.bufferTimeSpan;
180-
const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
181-
this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
182-
}
183-
}
184-
185-
openContext(): Context<T> {
186-
const context: Context<T> = new Context<T>();
187-
this.contexts.push(context);
188-
return context;
189-
}
190-
191-
closeContext(context: Context<T>) {
192-
this.destination.next(context.buffer);
193-
const contexts = this.contexts;
194-
195-
const spliceIndex = contexts ? contexts.indexOf(context) : -1;
196-
if (spliceIndex >= 0) {
197-
contexts.splice(contexts.indexOf(context), 1);
198-
}
199-
}
200-
}
201-
202-
function dispatchBufferTimeSpanOnly(this: Action<any>, state: any) {
203-
const subscriber: BufferTimeSubscriber<any> = state.subscriber;
204-
205-
const prevContext = state.context;
206-
if (prevContext) {
207-
subscriber.closeContext(prevContext);
208-
}
209-
210-
if (!subscriber.closed) {
211-
state.context = subscriber.openContext();
212-
state.context.closeAction = this.schedule(state, state.bufferTimeSpan);
213-
}
214-
}
215-
216-
interface DispatchArg<T> {
217-
subscriber: BufferTimeSubscriber<T>;
218-
context: Context<T>;
219-
}
220-
221-
function dispatchBufferCreation<T>(this: Action<CreationState<T>>, state: CreationState<T>) {
222-
const { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
223-
const context = subscriber.openContext();
224-
const action = <Action<CreationState<T>>>this;
225-
if (!subscriber.closed) {
226-
subscriber.add(context.closeAction = scheduler.schedule<DispatchArg<T>>(dispatchBufferClose, bufferTimeSpan, { subscriber, context }));
227-
action.schedule(state, bufferCreationInterval);
228-
}
229-
}
230-
231-
function dispatchBufferClose<T>(arg: DispatchArg<T>) {
232-
const { subscriber, context } = arg;
233-
subscriber.closeContext(context);
75+
return higherOrder(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler)(this);
23476
}

0 commit comments

Comments
 (0)