Skip to content

Commit

Permalink
fix(core): async EventEmitter should contribute to app stability (#56308
Browse files Browse the repository at this point in the history
)

async `EventEmitter` should contribute to app stability.

fixes #56290

PR Close #56308
  • Loading branch information
atscott authored and AndrewKushnir committed Jun 11, 2024
1 parent 5ec24c9 commit dbd0fa0
Show file tree
Hide file tree
Showing 15 changed files with 53 additions and 53 deletions.
4 changes: 1 addition & 3 deletions goldens/public-api/core/index.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -679,9 +679,7 @@ export interface ExistingSansProvider {
export class ExperimentalPendingTasks {
add(): () => void;
// (undocumented)
static ɵfac: i0.ɵɵFactoryDeclaration<ExperimentalPendingTasks, never>;
// (undocumented)
static ɵprov: i0.ɵɵInjectableDeclaration<ExperimentalPendingTasks>;
static ɵprov: unknown;
}

// @public
Expand Down
25 changes: 17 additions & 8 deletions packages/core/src/event_emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {OutputRef} from './authoring/output/output_ref';
import {isInInjectionContext} from './di/contextual';
import {inject} from './di/injector_compatibility';
import {DestroyRef} from './linker/destroy_ref';
import {PendingTasks} from './pending_tasks';

/**
* Use in components with the `@Output` directive to emit custom events
Expand Down Expand Up @@ -111,6 +112,7 @@ export interface EventEmitter<T> extends Subject<T>, OutputRef<T> {
class EventEmitter_ extends Subject<any> implements OutputRef<any> {
__isAsync: boolean; // tslint:disable-line
destroyRef: DestroyRef | undefined = undefined;
private readonly pendingTasks: PendingTasks | undefined = undefined;

constructor(isAsync: boolean = false) {
super();
Expand All @@ -120,6 +122,7 @@ class EventEmitter_ extends Subject<any> implements OutputRef<any> {
// For backwards compatibility reasons, this cannot be required
if (isInInjectionContext()) {
this.destroyRef = inject(DestroyRef, {optional: true}) ?? undefined;
this.pendingTasks = inject(PendingTasks);
}
}

Expand All @@ -145,14 +148,14 @@ class EventEmitter_ extends Subject<any> implements OutputRef<any> {
}

if (this.__isAsync) {
errorFn = _wrapInTimeout(errorFn);
errorFn = this.wrapInTimeout(errorFn);

if (nextFn) {
nextFn = _wrapInTimeout(nextFn);
nextFn = this.wrapInTimeout(nextFn);
}

if (completeFn) {
completeFn = _wrapInTimeout(completeFn);
completeFn = this.wrapInTimeout(completeFn);
}
}

Expand All @@ -164,12 +167,18 @@ class EventEmitter_ extends Subject<any> implements OutputRef<any> {

return sink;
}
}

function _wrapInTimeout(fn: (value: unknown) => any) {
return (value: unknown) => {
setTimeout(fn, undefined, value);
};
private wrapInTimeout(fn: (value: unknown) => any) {
return (value: unknown) => {
const taskId = this.pendingTasks?.add();
setTimeout(() => {
fn(value);
if (taskId !== undefined) {
this.pendingTasks?.remove(taskId);
}
});
};
}
}

/**
Expand Down
24 changes: 16 additions & 8 deletions packages/core/src/pending_tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@

import {BehaviorSubject} from 'rxjs';

import {inject} from './di';
import {Injectable} from './di/injectable';
import {inject} from './di/injector_compatibility';
import {ɵɵdefineInjectable} from './di/interface/defs';
import {OnDestroy} from './interface/lifecycle_hooks';

/**
* Internal implementation of the pending tasks service.
*/
@Injectable({
providedIn: 'root',
})
export class PendingTasks implements OnDestroy {
private taskId = 0;
private pendingTasks = new Set<number>();
Expand Down Expand Up @@ -48,6 +45,13 @@ export class PendingTasks implements OnDestroy {
this.hasPendingTasks.next(false);
}
}

/** @nocollapse */
static ɵprov = /** @pureOrBreakMyCode */ ɵɵdefineInjectable({
token: PendingTasks,
providedIn: 'root',
factory: () => new PendingTasks(),
});
}

/**
Expand Down Expand Up @@ -76,9 +80,6 @@ export class PendingTasks implements OnDestroy {
* @publicApi
* @experimental
*/
@Injectable({
providedIn: 'root',
})
export class ExperimentalPendingTasks {
private internalPendingTasks = inject(PendingTasks);
/**
Expand All @@ -89,4 +90,11 @@ export class ExperimentalPendingTasks {
const taskId = this.internalPendingTasks.add();
return () => this.internalPendingTasks.remove(taskId);
}

/** @nocollapse */
static ɵprov = /** @pureOrBreakMyCode */ ɵɵdefineInjectable({
token: ExperimentalPendingTasks,
providedIn: 'root',
factory: () => new ExperimentalPendingTasks(),
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,6 @@
{
"name": "_wasLastNodeCreated"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,9 +686,6 @@
{
"name": "_wasLastNodeCreated"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,6 @@
{
"name": "_wasLastNodeCreated"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
3 changes: 0 additions & 3 deletions packages/core/test/bundling/defer/bundle.golden_symbols.json
Original file line number Diff line number Diff line change
Expand Up @@ -581,9 +581,6 @@
{
"name": "_wasLastNodeCreated"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,9 +716,6 @@
{
"name": "_wasLastNodeCreated"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,9 +698,6 @@
{
"name": "_wasLastNodeCreated"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,6 @@
{
"name": "_retrieveHydrationInfoImpl"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,6 @@
{
"name": "_wasLastNodeCreated"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
3 changes: 0 additions & 3 deletions packages/core/test/bundling/router/bundle.golden_symbols.json
Original file line number Diff line number Diff line change
Expand Up @@ -842,9 +842,6 @@
{
"name": "_wasLastNodeCreated"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,6 @@
{
"name": "_wasLastNodeCreated"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
3 changes: 0 additions & 3 deletions packages/core/test/bundling/todo/bundle.golden_symbols.json
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,6 @@
{
"name": "_wasLastNodeCreated"
},
{
"name": "_wrapInTimeout"
},
{
"name": "activeConsumer"
},
Expand Down
20 changes: 19 additions & 1 deletion packages/core/test/event_emitter_spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
* found in the LICENSE file at https://angular.io/license
*/

import {filter} from 'rxjs/operators';
import {TestBed} from '@angular/core/testing';
import {filter, tap} from 'rxjs/operators';

import {EventEmitter} from '../src/event_emitter';
import {ApplicationRef} from '../public_api';
import {firstValueFrom} from 'rxjs';
import {PendingTasks} from '../src/pending_tasks';

describe('EventEmitter', () => {
let emitter: EventEmitter<number>;
Expand Down Expand Up @@ -190,6 +194,20 @@ describe('EventEmitter', () => {
expect(emitter.observers.length).toBe(0);
});

it('contributes to app stability', async () => {
const emitter = TestBed.runInInjectionContext(() => new EventEmitter<number>(true));
let emitValue: number;
emitter.subscribe({
next: (v: number) => {
emitValue = v;
},
});
emitter.emit(1);
await firstValueFrom(TestBed.inject(ApplicationRef).isStable.pipe(filter((stable) => stable)));
expect(emitValue!).toBeDefined();
expect(emitValue!).toEqual(1);
});

// TODO: vsavkin: add tests cases
// should call dispose on the subscription if generator returns {done:true}
// should call dispose on the subscription on throw
Expand Down

0 comments on commit dbd0fa0

Please sign in to comment.