diff --git a/packages/rxjs/spec/scheduled/scheduled-spec.ts b/packages/rxjs/spec/scheduled/scheduled-spec.ts index f73d65e4b9..11fcc5ab86 100644 --- a/packages/rxjs/spec/scheduled/scheduled-spec.ts +++ b/packages/rxjs/spec/scheduled/scheduled-spec.ts @@ -64,4 +64,29 @@ describe('scheduled', () => { done(); }); }); + + it('should handle scheduling a promise that unsubscribes prior to complete', (done) => { + const results: any[] = []; + const input = Promise.resolve('x'); // strings are iterables + const subscription = scheduled(input, testScheduler).subscribe({ + next(value) { + results.push(value); + subscription.unsubscribe(); + }, + complete() { results.push('done'); }, + }); + + expect(results).to.deep.equal([]); + + // Promises force async, so we can't schedule synchronously, no matter what. + testScheduler.flush(); + expect(results).to.deep.equal([]); + + Promise.resolve().then(() => { + // NOW it should work, as the other promise should have resolved. + testScheduler.flush(); + expect(results).to.deep.equal(['x']); + done(); + }); + }); }); diff --git a/packages/rxjs/src/internal/observable/range.ts b/packages/rxjs/src/internal/observable/range.ts index e3fea50c0e..09364c9277 100644 --- a/packages/rxjs/src/internal/observable/range.ts +++ b/packages/rxjs/src/internal/observable/range.ts @@ -1,6 +1,7 @@ import type { SchedulerLike } from '../types.js'; import { Observable } from '../Observable.js'; import { EMPTY } from './empty.js'; +import { executeSchedule } from '../util/executeSchedule.js'; export function range(start: number, count?: number): Observable; @@ -72,14 +73,17 @@ export function range(start: number, count?: number, scheduler?: SchedulerLike): ? // The deprecated scheduled path. (subscriber) => { let n = start; - return scheduler.schedule(function () { + const emit = () => { if (n < end) { subscriber.next(n++); - this.schedule(); + if (!subscriber.closed) { + executeSchedule(subscriber, scheduler, emit); + } } else { subscriber.complete(); } - }); + }; + executeSchedule(subscriber, scheduler, emit); } : // Standard synchronous range. (subscriber) => { diff --git a/packages/rxjs/src/internal/observable/timer.ts b/packages/rxjs/src/internal/observable/timer.ts index 5d66c494aa..ef5e96909d 100644 --- a/packages/rxjs/src/internal/observable/timer.ts +++ b/packages/rxjs/src/internal/observable/timer.ts @@ -3,6 +3,7 @@ import type { SchedulerLike } from '../types.js'; import { asyncScheduler } from '../scheduler/async.js'; import { isScheduler } from '../util/isScheduler.js'; import { isValidDate } from '../util/isDate.js'; +import { executeSchedule } from '../util/executeSchedule.js'; /** * Creates an observable that will wait for a specified time period, or exact date, before @@ -167,20 +168,32 @@ export function timer( let n = 0; // Start the timer. - return scheduler.schedule(function () { - if (!subscriber.closed) { - // Emit the next value and increment. + return executeSchedule( + subscriber, + scheduler, + () => { + // Emit the first value and schedule the next. subscriber.next(n++); if (0 <= intervalDuration) { // If we have a interval after the initial timer, // reschedule with the period. - this.schedule(undefined, intervalDuration); + executeSchedule( + subscriber, + scheduler, + () => { + // Emit the interval values. + subscriber.next(n++); + }, + intervalDuration, + true + ); } else { // We didn't have an interval. So just complete. subscriber.complete(); } - } - }, due); + }, + due + ); }); } diff --git a/packages/rxjs/src/internal/operators/debounceTime.ts b/packages/rxjs/src/internal/operators/debounceTime.ts index 5381a777d5..514b4c8f9a 100644 --- a/packages/rxjs/src/internal/operators/debounceTime.ts +++ b/packages/rxjs/src/internal/operators/debounceTime.ts @@ -1,7 +1,8 @@ import { asyncScheduler } from '../scheduler/async.js'; -import type { Subscription} from '../Observable.js'; +import type { Subscription } from '../Observable.js'; import { Observable, operate } from '../Observable.js'; -import type { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike } from '../types.js'; +import type { MonoTypeOperatorFunction, SchedulerLike } from '../types.js'; +import { executeSchedule } from '../util/executeSchedule.js'; /** * Emits a notification from the source Observable only after a particular time span @@ -62,64 +63,40 @@ import type { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike } from '. export function debounceTime(dueTime: number, scheduler: SchedulerLike = asyncScheduler): MonoTypeOperatorFunction { return (source) => new Observable((destination) => { - let activeTask: Subscription | null = null; - let lastValue: T | null = null; - let lastTime: number | null = null; - let scheduling = false; - - const emit = () => { - if (scheduling || activeTask) { - // We have a value! Free up memory first, then emit the value. - if (activeTask) { - activeTask.unsubscribe(); - activeTask = null; - } - const value = lastValue!; - lastValue = null; - destination.next(value); - } - }; - function emitWhenIdle(this: SchedulerAction) { - // This is called `dueTime` after the first value - // but we might have received new values during this window! - - const targetTime = lastTime! + dueTime; - const now = scheduler.now(); - if (now < targetTime) { - // On that case, re-schedule to the new target - activeTask = this.schedule(undefined, targetTime - now); - destination.add(activeTask); - return; - } - - emit(); - } + let lastValue: T; + let activeTask: Subscription | void; source.subscribe( operate({ destination, next: (value: T) => { lastValue = value; - lastTime = scheduler.now(); + // Clear any pending task and schedule a new one. + activeTask?.unsubscribe(); - // Only set up a task if it's not already up - if (!activeTask) { - scheduling = true; - activeTask = scheduler.schedule(emitWhenIdle, dueTime); - scheduling = false; - // Set activeTask as intermediary Subscription to handle synchronous schedulers - destination.add(activeTask); - } + activeTask = executeSchedule( + destination, + scheduler, + () => { + activeTask = undefined; + const v = lastValue; + lastValue = null!; + destination.next(v); + }, + dueTime + ); }, complete: () => { // Source completed. // Emit any pending debounced values then complete - emit(); + if (activeTask) { + destination.next(lastValue); + } destination.complete(); }, finalize: () => { // Finalization. - lastValue = activeTask = null; + lastValue = activeTask = null!; }, }) ); diff --git a/packages/rxjs/src/internal/operators/timeout.ts b/packages/rxjs/src/internal/operators/timeout.ts index 4cd5dcf4e4..30a8c8d269 100644 --- a/packages/rxjs/src/internal/operators/timeout.ts +++ b/packages/rxjs/src/internal/operators/timeout.ts @@ -311,7 +311,7 @@ export function timeout, M>( let originalSourceSubscription: Subscription; // The subscription for our timeout timer. This changes // every time we get a new value. - let timerSubscription: Subscription; + let timerSubscription: Subscription | void; // A bit of state we pass to our with and error factories to // tell what the last value we saw was. let lastValue: T | null = null; @@ -353,9 +353,7 @@ export function timeout, M>( each! > 0 && startTimer(each!); }, finalize: () => { - if (!timerSubscription?.closed) { - timerSubscription?.unsubscribe(); - } + timerSubscription?.unsubscribe(); // Be sure not to hold the last value in memory after unsubscription // it could be quite large. lastValue = null; diff --git a/packages/rxjs/src/internal/scheduled/scheduleArray.ts b/packages/rxjs/src/internal/scheduled/scheduleArray.ts index ef40e61747..6e46367af1 100644 --- a/packages/rxjs/src/internal/scheduled/scheduleArray.ts +++ b/packages/rxjs/src/internal/scheduled/scheduleArray.ts @@ -1,27 +1,24 @@ import { Observable } from '../Observable.js'; import type { SchedulerLike } from '../types.js'; +import { executeSchedule } from '../util/executeSchedule.js'; export function scheduleArray(input: ArrayLike, scheduler: SchedulerLike) { return new Observable((subscriber) => { // The current array index. let i = 0; - // Start iterating over the array like on a schedule. - return scheduler.schedule(function () { + const emit = () => { + // If we have hit the end of the array, complete. if (i === input.length) { - // If we have hit the end of the array like in the - // previous job, we can complete. subscriber.complete(); } else { - // Otherwise let's next the value at the current index, + // Otherwise, next the value at the current index, // then increment our index. subscriber.next(input[i++]); - // If the last emission didn't cause us to close the subscriber - // (via take or some side effect), reschedule the job and we'll - // make another pass. - if (!subscriber.closed) { - this.schedule(); - } + executeSchedule(subscriber, scheduler, emit); } - }); + }; + + // Start iterating over the array like on a schedule. + return executeSchedule(subscriber, scheduler, emit); }); } diff --git a/packages/rxjs/src/internal/util/executeSchedule.ts b/packages/rxjs/src/internal/util/executeSchedule.ts index 3250baa0f9..b8367596f7 100644 --- a/packages/rxjs/src/internal/util/executeSchedule.ts +++ b/packages/rxjs/src/internal/util/executeSchedule.ts @@ -1,21 +1,6 @@ import type { Subscription } from '../Observable.js'; import type { SchedulerAction, SchedulerLike } from '../types.js'; -export function executeSchedule( - parentSubscription: Subscription, - scheduler: SchedulerLike, - work: () => void, - delay: number, - repeat: true -): void; -export function executeSchedule( - parentSubscription: Subscription, - scheduler: SchedulerLike, - work: () => void, - delay?: number, - repeat?: false -): Subscription; - export function executeSchedule( parentSubscription: Subscription, scheduler: SchedulerLike, @@ -23,22 +8,24 @@ export function executeSchedule( delay = 0, repeat = false ): Subscription | void { - const scheduleSubscription = scheduler.schedule(function (this: SchedulerAction) { - work(); - if (repeat) { - parentSubscription.add(this.schedule(null, delay)); - } else { - this.unsubscribe(); - } - }, delay); + if (!parentSubscription.closed) { + const scheduleSubscription = scheduler.schedule(function (this: SchedulerAction) { + work(); + if (repeat) { + parentSubscription.add(this.schedule(null, delay)); + } else { + this.unsubscribe(); + } + }, delay); - parentSubscription.add(scheduleSubscription); + parentSubscription.add(scheduleSubscription); - if (!repeat) { - // Because user-land scheduler implementations are unlikely to properly reuse - // Actions for repeat scheduling, we can't trust that the returned subscription - // will control repeat subscription scenarios. So we're trying to avoid using them - // incorrectly within this library. - return scheduleSubscription; + if (!repeat) { + // Because user-land scheduler implementations are unlikely to properly reuse + // Actions for repeat scheduling, we can't trust that the returned subscription + // will control repeat subscription scenarios. So we're trying to avoid using them + // incorrectly within this library. + return scheduleSubscription; + } } }