Skip to content

Commit

Permalink
fix(takeWhile): reentrant errors and completions behave properly
Browse files Browse the repository at this point in the history
- Resolves an issue where a reentrant error notification would short circuit the completion.
- Adds additional tests.

Related ReactiveX#5487

BREAKING CHANGE: If a the source synchronously errors after it recieves a completion notification, the error will no longer be emitted. This is a bug fix, but may be a breaking change for those relying on this behavior. If you need to mimic the behavior, you'll need to throw the error before the takeWhile notifier is notified.
  • Loading branch information
benlesh committed Jan 20, 2023
1 parent 2eed17c commit 342f41a
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 6 deletions.
86 changes: 85 additions & 1 deletion spec/operators/takeWhile-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { takeWhile, tap, mergeMap } from 'rxjs/operators';
import { of, Observable, from } from 'rxjs';
import { of, Observable, from, Subject, merge } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -365,4 +365,88 @@ describe('takeWhile', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

it('should not emit errors sent from the source *after* it found the first value in reentrant scenarios', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const source = cold('-------a----b----c---|');
const expected = ' -------a----(b|)';
const subs = ' ^-----------!';

const result = merge(source, subject).pipe(
takeWhile((x) => x !== 'b', true),
tap({
complete: () => {
subject.error(new Error('reentrant shennanigans'));
},
})
);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should not emit errors sent from the source *after* it found the first value in reentrant scenarios', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const source = cold('-------a----b----c---|');
const expected = ' -------a----|';
const subs = ' ^-----------!';

const result = merge(source, subject).pipe(
takeWhile((x) => x !== 'b'),
tap({
complete: () => {
subject.error(new Error('reentrant shennanigans'));
},
})
);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should not emit completions sent from the source *after* it found the first value in reentrant scenarios', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const source = cold('-------a----b----c---|');
const expected = ' -------a----(b|)';
const subs = ' ^-----------!';

const result = merge(source, subject).pipe(
takeWhile((x) => x !== 'b', true),
tap({
complete: () => {
subject.complete();
},
})
);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should not emit completions sent from the source *after* it found the first value in reentrant scenarios', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const source = cold('-------a----b----c---|');
const expected = ' -------a----|';
const subs = ' ^-----------!';

const result = merge(source, subject).pipe(
takeWhile((x) => x !== 'b'),
tap({
complete: () => {
subject.complete();
},
})
);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
});
28 changes: 23 additions & 5 deletions src/internal/operators/takeWhile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,30 @@ export function takeWhile<T>(predicate: (value: T, index: number) => boolean, in
export function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive = false): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let index = 0;
let taking = true;
source.subscribe(
createOperatorSubscriber(subscriber, (value) => {
const result = predicate(value, index++);
(result || inclusive) && subscriber.next(value);
!result && subscriber.complete();
})
createOperatorSubscriber(
subscriber,
(value) => {
taking = predicate(value, index++);
if (taking || inclusive) {
subscriber.next(value);
}
if (!taking) {
subscriber.complete();
}
},
() => {
if (taking) {
subscriber.complete();
}
},
(err) => {
if (taking) {
subscriber.error(err);
}
}
)
);
});
}

0 comments on commit 342f41a

Please sign in to comment.