Skip to content

Commit

Permalink
fix(first/take): behave properly with reentrant errors and completions
Browse files Browse the repository at this point in the history
- Resolves an issue with both `first` and `take` where an error emitted by a reentrant source at the moment the last value it taken would result in superceding the expected emitted value and completion.
- Resolves a similar issue where a reentrant completion would supercede the expected last value and completion

Fixes ReactiveX#5487
  • Loading branch information
benlesh committed May 10, 2021
1 parent d3c55da commit dd3a3d8
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 14 deletions.
80 changes: 78 additions & 2 deletions spec/operators/first-spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/** @prettier */
import { expect } from 'chai';
import { observableMatcher } from '../helpers/observableMatcher';
import { first, mergeMap, delay } from 'rxjs/operators';
import { first, mergeMap, delay, tap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { of, from, Observable, Subject, EmptyError } from 'rxjs';
import { of, from, Observable, Subject, EmptyError, merge } from 'rxjs';

/** @test {first} */
describe('first', () => {
Expand Down Expand Up @@ -330,4 +330,80 @@ describe('first', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

it('should not emit errors sent from the source *after* it found the first value in reentrant scenarios', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const source = cold('-------a----b----c---|');
const expected = ' -------(a|)';
const subs = ' ^------!';

const result = merge(source, subject).pipe(
first(),
tap(() => {
subject.error(new Error('reentrant shennanigans'));
})
);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should not emit errors sent from the source *after* it found the first value in reentrant scenarios', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const source = cold('-------a----b----c---|');
const expected = ' ------------(b|)';
const subs = ' ^-----------!';

const result = merge(source, subject).pipe(
first((value) => value === 'b'),
tap(() => {
subject.error(new Error('reentrant shennanigans'));
})
);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should not emit complete sent from the source *after* it found the first value in reentrant scenarios', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const source = cold('-------a----b----c---|');
const expected = ' -------(a|)';
const subs = ' ^------!';

const result = merge(source, subject).pipe(
first(),
tap(() => {
subject.complete();
})
);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should not emit completions sent from the source *after* it found the first value in reentrant scenarios', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const source = cold('-------a----b----c---|');
const expected = ' ------------(b|)';
const subs = ' ^-----------!';

const result = merge(source, subject).pipe(
first((value) => value === 'b'),
tap(() => {
subject.complete();
})
);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
});
19 changes: 18 additions & 1 deletion spec/operators/take-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ describe('take', () => {
});
});

it.skip('should unsubscribe from the source when it reaches the limit before a recursive synchronous upstream error is notified', () => {
it('should unsubscribe from the source when it reaches the limit before a recursive synchronous upstream error is notified', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const e1 = cold(' (a|)');
Expand All @@ -234,4 +234,21 @@ describe('take', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should unsubscribe from the source when it reaches the limit before a recursive synchronous upstream completion is notified', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const e1 = cold(' (a|)');
const e1subs = ' (^!)';
const expected = '(a|)';

const result = merge(e1, subject).pipe(
take(1),
tap(() => subject.complete())
);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
});
43 changes: 32 additions & 11 deletions src/internal/operators/take.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,41 @@ export function take<T>(count: number): MonoTypeOperatorFunction<T> {
: operate((source, subscriber) => {
let seen = 0;
source.subscribe(
new OperatorSubscriber(subscriber, (value) => {
// Increment the number of values we have seen,
// then check it against the allowed count to see
// if we are still letting values through.
if (++seen <= count) {
subscriber.next(value);
// If we have met or passed our allowed count,
// we need to complete. We have to do <= here,
// because re-entrant code will increment `seen` twice.
if (count <= seen) {
new OperatorSubscriber(
subscriber,
(value) => {
// Increment the number of values we have seen,
// then check it against the allowed count to see
// if we are still letting values through.
if (++seen <= count) {
subscriber.next(value);
// If we have met or passed our allowed count,
// we need to complete. We have to do <= here,
// because re-entrant code will increment `seen` twice.
if (count <= seen) {
subscriber.complete();
}
}
},
() => {
// If seen === count or higher, then we've already taken all of
// the values we were supposed to, and the complete we're getting here
// is from reentrant code racing our `complete` above. We want to stop
// that here.
if (seen < count) {
subscriber.complete();
}
},
(err) => {
// If seen === count or higher, then we've already taken all of
// the values we were supposed to, and the error we're getting here
// is from reentrant code racing our `complete` above. We want to stop
// that here.
if (seen < count) {
subscriber.error(err);
}
}
})
)
);
});
}

0 comments on commit dd3a3d8

Please sign in to comment.