Skip to content

Commit

Permalink
feat(repeatWhen): notifier supports ObservableInput (#7103)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremymwells committed Dec 15, 2022
1 parent 1cf9994 commit 8f1b976
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 13 deletions.
72 changes: 67 additions & 5 deletions spec-dtslint/operators/repeatWhen-spec.ts
@@ -1,22 +1,84 @@
import { of } from 'rxjs';
import { repeatWhen } from 'rxjs/operators';
import { asInteropObservable } from '../../spec/helpers/interop-helper';

it('should infer correctly', () => {
const o = of(1, 2, 3).pipe(repeatWhen(errors => errors)); // $ExpectType Observable<number>
of(1, 2, 3).pipe(repeatWhen(errors => errors)); // $ExpectType Observable<number>
});

it('should infer correctly when the error observable has a different type', () => {
const o = of(1, 2, 3).pipe(repeatWhen(repeatWhen(errors => of('a', 'b', 'c')))); // $ExpectType Observable<number>
of(1, 2, 3).pipe(repeatWhen(errors => asInteropObservable(of('a', 'b', 'c')))); // $ExpectType Observable<number>
});

it('should enforce types', () => {
const o = of(1, 2, 3).pipe(repeatWhen()); // $ExpectError
of(1, 2, 3).pipe(repeatWhen()); // $ExpectError
});

it('should accept interop observable notifier', () => {
of(1, 2, 3).pipe(repeatWhen(() => asInteropObservable(of(true)))); // $ExpectType Observable<number>
});

it('should accept promise notifier', () => {
of(1, 2, 3).pipe(repeatWhen(() => Promise.resolve(true))); // $ExpectType Observable<number>
});

it('should async iterable notifier', () => {
const asyncRange = {
from: 1,
to: 2,
[Symbol.asyncIterator]() {
return {
current: this.from,
last: this.to,
async next() {
await Promise.resolve();
const done = (this.current > this.last);
return {
done,
value: done ? this.current++ : undefined
};
}
};
}
};
of(1, 2, 3).pipe(repeatWhen(() => asyncRange)); // $ExpectType Observable<number>
});

it('should accept iterable notifier', () => {
const syncRange = {
from: 1,
to: 2,
[Symbol.iterator]() {
return {
current: this.from,
last: this.to,
next() {
const done = (this.current > this.last);
return {
done,
value: done ? this.current++ : undefined
};
}
};
}
};
of(1, 2, 3).pipe(repeatWhen(() => syncRange)); // $ExpectType Observable<number>
});

it('should accept readable stream notifier', () => {
const readableStream = new ReadableStream<string>({
pull(controller) {
controller.enqueue('x');
controller.close();
},
});
of(1, 2, 3).pipe(repeatWhen(() => readableStream)); // $ExpectType Observable<number>
});

it('should enforce types of the notifier', () => {
const o = of(1, 2, 3).pipe(repeatWhen(() => 8)); // $ExpectError
of(1, 2, 3).pipe(repeatWhen(() => 8)); // $ExpectError
});

it('should be deprecated', () => {
const o = of(1, 2, 3).pipe(repeatWhen(() => of(true))); // $ExpectDeprecation
of(1, 2, 3).pipe(repeatWhen(() => of(true))); // $ExpectDeprecation
});
2 changes: 1 addition & 1 deletion src/index.ts
Expand Up @@ -164,7 +164,7 @@ export { publishLast } from './internal/operators/publishLast';
export { publishReplay } from './internal/operators/publishReplay';
export { raceWith } from './internal/operators/raceWith';
export { reduce } from './internal/operators/reduce';
export { repeat } from './internal/operators/repeat';
export { repeat, RepeatConfig } from './internal/operators/repeat';
export { repeatWhen } from './internal/operators/repeatWhen';
export { retry, RetryConfig } from './internal/operators/retry';
export { retryWhen } from './internal/operators/retryWhen';
Expand Down
14 changes: 8 additions & 6 deletions src/internal/operators/repeatWhen.ts
@@ -1,8 +1,9 @@
import { Observable } from '../Observable';
import { innerFrom } from '../observable/innerFrom';
import { Subject } from '../Subject';
import { Subscription } from '../Subscription';

import { MonoTypeOperatorFunction } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { createOperatorSubscriber } from './OperatorSubscriber';

Expand Down Expand Up @@ -33,13 +34,14 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
* @see {@link retry}
* @see {@link retryWhen}
*
* @param {function(notifications: Observable): Observable} notifier - Receives an Observable of notifications with
* @param notifier Function that receives an Observable of notifications with
* which a user can `complete` or `error`, aborting the repetition.
* @return A function that returns an Observable that mirrors the source
* @return A function that returns an `ObservableInput` that mirrors the source
* Observable with the exception of a `complete`.
* @deprecated Will be removed in v9 or v10. Use {@link repeat}'s `delay` option instead.
* @deprecated Will be removed in v9 or v10. Use {@link repeat}'s {@link RepeatConfig#delay delay} option instead.
* Instead of `repeatWhen(() => notify$)`, use: `repeat({ delay: () => notify$ })`.
*/
export function repeatWhen<T>(notifier: (notifications: Observable<void>) => Observable<any>): MonoTypeOperatorFunction<T> {
export function repeatWhen<T>(notifier: (notifications: Observable<void>) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let innerSub: Subscription | null;
let syncResub = false;
Expand All @@ -61,7 +63,7 @@ export function repeatWhen<T>(notifier: (notifications: Observable<void>) => Obs

// If the call to `notifier` throws, it will be caught by the OperatorSubscriber
// In the main subscription -- in `subscribeForRepeatWhen`.
notifier(completions$).subscribe(
innerFrom(notifier(completions$)).subscribe(
createOperatorSubscriber(
subscriber,
() => {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/index.ts
Expand Up @@ -68,7 +68,7 @@ export { publishReplay } from '../internal/operators/publishReplay';
export { race } from '../internal/operators/race';
export { raceWith } from '../internal/operators/raceWith';
export { reduce } from '../internal/operators/reduce';
export { repeat } from '../internal/operators/repeat';
export { repeat, RepeatConfig } from '../internal/operators/repeat';
export { repeatWhen } from '../internal/operators/repeatWhen';
export { retry, RetryConfig } from '../internal/operators/retry';
export { retryWhen } from '../internal/operators/retryWhen';
Expand Down

0 comments on commit 8f1b976

Please sign in to comment.