Skip to content

Commit

Permalink
fix(bufferToggle): fix bugs in order to pass tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros authored and benlesh committed Oct 15, 2015
1 parent 765761e commit 949fa31
Showing 1 changed file with 27 additions and 17 deletions.
44 changes: 27 additions & 17 deletions src/operators/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ class BufferToggleOperator<T, R, O> implements Operator<T, R> {
}
}

interface BufferContext<T> {
buffer: T[];
subscription: Subscription<T>;
}

class BufferToggleSubscriber<T, O> extends Subscriber<T> {
private buffers: Array<T[]> = [];
private contexts: Array<BufferContext<T>> = [];
private closingNotification: Subscription<any>;

constructor(destination: Subscriber<T>,
Expand All @@ -45,53 +50,58 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
}

_next(value: T) {
const buffers = this.buffers;
const len = buffers.length;
const contexts = this.contexts;
const len = contexts.length;
for (let i = 0; i < len; i++) {
buffers[i].push(value);
contexts[i].buffer.push(value);
}
}

_error(err: any) {
this.buffers = null;
this.contexts = null;
this.destination.error(err);
}

_complete() {
const buffers = this.buffers;
while (buffers.length > 0) {
this.destination.next(buffers.shift());
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
this.destination.next(context.buffer);
context.subscription.unsubscribe();
context.buffer = null;
}
this.destination.complete();
}

openBuffer(value: O) {
const closingSelector = this.closingSelector;
const buffers = this.buffers;
const contexts = this.contexts;

let closingNotifier = tryCatch(closingSelector)(value);
if (closingNotifier === errorObject) {
const err = closingNotifier.e;
this.buffers = null;
this.contexts = null;
this.destination.error(err);
} else {
let buffer = [];
let context = {
buffer,
buffer: [],
subscription: new Subscription()
};
buffers.push(buffer);
contexts.push(context);
const subscriber = new BufferClosingNotifierSubscriber(this, context);
const subscription = closingNotifier._subscribe(subscriber);
this.add(context.subscription.add(subscription));
}
}

closeBuffer(context: { subscription: any, buffer: T[] }) {
closeBuffer(context: BufferContext<T>) {
const contexts = this.contexts;
if (contexts === null) {
return;
}
const { buffer, subscription } = context;
const buffers = this.buffers;
this.destination.next(buffer);
buffers.splice(buffers.indexOf(buffer), 1);
contexts.splice(contexts.indexOf(context), 1);
this.remove(subscription);
subscription.unsubscribe();
}
Expand All @@ -111,7 +121,7 @@ class BufferClosingNotifierSubscriber<T> extends Subscriber<T> {
}

_complete() {
// noop
this.parent.closeBuffer(this.context);
}
}

Expand Down

0 comments on commit 949fa31

Please sign in to comment.