Skip to content

Commit

Permalink
feat(core): introduce outputFromObservable() interop function (#54650)
Browse files Browse the repository at this point in the history
Introduces a second API in addition to the new `output()` function.

The new function `outputFromObservable()` can be used to declare outputs
using the new `OutputRef` API and `output()` API, while using a custom
RxJS observable as data source.

This is something that is currently possible in Angular and we would
like to keep possible- even though we never intended to support custom
observables aside from RxJS-based `EventEmitter`.

The interop bridges the gap and allows you to continue using
`Subject`, `ReplaySubject`, `BehaivorSubjct,` - or cold custom
observables for outputs. You can still trigger logic only when
the output is subscribed- unlike with imperative `emit`s of
`EventEmitter` or the new `OutputEmitterRef`.

A notable difference is that you need two class members where you
previously could access the `Subject` directly. This is an intentional
trade-off we've made to ensure that all new outputs implement the
`OutputRef` interface and we are exposing a minimal API surface to
consumers of components that currently access the output
programmatically.

PR Close #54650
  • Loading branch information
devversion authored and pkozlowski-opensource committed Mar 6, 2024
1 parent 9b51292 commit c809069
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 0 deletions.
5 changes: 5 additions & 0 deletions goldens/public-api/core/rxjs-interop/index.md
Expand Up @@ -8,9 +8,14 @@ import { DestroyRef } from '@angular/core';
import { Injector } from '@angular/core';
import { MonoTypeOperatorFunction } from 'rxjs';
import { Observable } from 'rxjs';
import { OutputOptions } from '@angular/core';
import { OutputRef } from '@angular/core';
import { Signal } from '@angular/core';
import { Subscribable } from 'rxjs';

// @public
export function outputFromObservable<T>(observable: Observable<T>, opts?: OutputOptions): OutputRef<T>;

// @public
export function takeUntilDestroyed<T>(destroyRef?: DestroyRef): MonoTypeOperatorFunction<T>;

Expand Down
1 change: 1 addition & 0 deletions packages/core/rxjs-interop/src/index.ts
Expand Up @@ -6,6 +6,7 @@
* found in the LICENSE file at https://angular.io/license
*/

export {outputFromObservable} from './output_from_observable';
export {takeUntilDestroyed} from './take_until_destroyed';
export {toObservable, ToObservableOptions} from './to_observable';
export {toSignal, ToSignalOptions} from './to_signal';
79 changes: 79 additions & 0 deletions packages/core/rxjs-interop/src/output_from_observable.ts
@@ -0,0 +1,79 @@
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

import {assertInInjectionContext, DestroyRef, inject, OutputOptions, OutputRef, OutputRefSubscription, ɵRuntimeError, ɵRuntimeErrorCode} from '@angular/core';
import {Observable} from 'rxjs';

import {takeUntilDestroyed} from './take_until_destroyed';

/**
* Implementation of `OutputRef` that emits values from
* an RxJS observable source.
*
* @internal
*/
class OutputFromObservableRef<T> implements OutputRef<T> {
private destroyed = false;

destroyRef = inject(DestroyRef);

constructor(private source: Observable<T>) {
this.destroyRef.onDestroy(() => {
this.destroyed = true;
});
}

subscribe(callbackFn: (value: T) => void): OutputRefSubscription {
if (this.destroyed) {
throw new ɵRuntimeError(
ɵRuntimeErrorCode.OUTPUT_REF_DESTROYED,
ngDevMode &&
'Unexpected subscription to destroyed `OutputRef`. ' +
'The owning directive/component is destroyed.');
}

// Stop yielding more values when the directive/component is already destroyed.
const subscription = this.source.pipe(takeUntilDestroyed(this.destroyRef)).subscribe({
next: value => callbackFn(value),
});

return {
unsubscribe: () => subscription.unsubscribe(),
};
}
}

/**
* Declares an Angular output that is using an RxJS observable as a source
* for events dispatched to parent subscribers.
*
* The behavior for an observable as source is defined as followed:
* 1. New values are forwarded to the Angular output (next notifications).
* 2. Errors notifications are not handled by Angular. You need to handle these manually.
* For example by using `catchError`.
* 3. Completion notifications stop the output from emitting new values.
*
* @usageNotes
* Initialize an output in your directive by declaring a
* class field and initializing it with the `outputFromObservable()` function.
*
* ```ts
* @Directive({..})
* export class MyDir {
* nameChange$ = <some-observable>;
* nameChange = outputFromObservable(this.nameChange$);
* }
* ```
*
* @developerPreview
*/
export function outputFromObservable<T>(
observable: Observable<T>, opts?: OutputOptions): OutputRef<T> {
ngDevMode && assertInInjectionContext(outputFromObservable);
return new OutputFromObservableRef<T>(observable);
}
176 changes: 176 additions & 0 deletions packages/core/rxjs-interop/test/output_from_observable_spec.ts
@@ -0,0 +1,176 @@
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

import {EventEmitter} from '@angular/core/public_api';
import {TestBed} from '@angular/core/testing';
import {BehaviorSubject, config, ReplaySubject, Subject} from 'rxjs';

import {outputFromObservable} from '../src';

describe('outputFromObservable()', () => {
// Safety clean-up as we are patching `onUnhandledError` in this test.
afterEach(() => config.onUnhandledError = null);

it('should support emitting values via BehaviorSubject', () => {
const subject = new BehaviorSubject(0);
const output = TestBed.runInInjectionContext(() => outputFromObservable(subject));

const values: number[] = [];
output.subscribe(v => values.push(v));

expect(values).toEqual([0]);

subject.next(1);
subject.next(2);
expect(values).toEqual([0, 1, 2]);
});

it('should support emitting values via ReplaySubject', () => {
const subject = new ReplaySubject<number>(1);
const output = TestBed.runInInjectionContext(() => outputFromObservable(subject));

// Emit before any subscribers!
subject.next(1);

const values: number[] = [];
output.subscribe(v => values.push(v));

expect(values).toEqual([1]);

subject.next(2);
subject.next(3);
expect(values).toEqual([1, 2, 3]);
});

it('should support emitting values via Subject', () => {
const subject = new Subject<number>();
const output = TestBed.runInInjectionContext(() => outputFromObservable(subject));

// Emit before any subscribers! Ignored!
subject.next(1);

const values: number[] = [];
output.subscribe(v => values.push(v));

expect(values).toEqual([]);

subject.next(2);
subject.next(3);
expect(values).toEqual([2, 3]);
});

it('should support emitting values via EventEmitter', () => {
const emitter = new EventEmitter<number>();
const output = TestBed.runInInjectionContext(() => outputFromObservable(emitter));

// Emit before any subscribers! Ignored!
emitter.next(1);

const values: number[] = [];
output.subscribe(v => values.push(v));

expect(values).toEqual([]);

emitter.next(2);
emitter.next(3);
expect(values).toEqual([2, 3]);
});

it('should support explicit unsubscribing', () => {
const subject = new Subject<number>();
const output = TestBed.runInInjectionContext(() => outputFromObservable(subject));

const values: number[] = [];

expect(subject.observed).toBe(false);

const subscription = output.subscribe(v => values.push(v));
expect(subject.observed).toBe(true);
expect(values).toEqual([]);

subject.next(2);
subject.next(3);
expect(values).toEqual([2, 3]);

subscription.unsubscribe();
expect(subject.observed).toBe(false);
});

it('should not yield more source values if directive is destroyed', () => {
const subject = new Subject<number>();
const output = TestBed.runInInjectionContext(() => outputFromObservable(subject));

const values: number[] = [];

expect(subject.observed).toBe(false);

output.subscribe(v => values.push(v));
expect(subject.observed).toBe(true);
expect(values).toEqual([]);

// initiate destroy.
TestBed.resetTestingModule();

expect(subject.observed).toBe(false);

subject.next(2);
subject.next(3);
expect(values).toEqual([]);
});

it('should throw if subscriptions are added after directive destroy', () => {
const subject = new Subject<number>();
const output = TestBed.runInInjectionContext(() => outputFromObservable(subject));

// initiate destroy.
TestBed.resetTestingModule();

expect(() => output.subscribe(() => {}))
.toThrowError(/Unexpected subscription to destroyed `OutputRef`/);
});


it('should be a noop when the source observable completes', () => {
const subject = new Subject<number>();
const outputRef = TestBed.runInInjectionContext(() => outputFromObservable(subject));

const values: number[] = [];
outputRef.subscribe(v => values.push(v));

subject.next(1);
subject.next(2);
expect(values).toEqual([1, 2]);

subject.complete();
subject.next(3);

expect(values).toEqual([1, 2]);
});

it('should not handle errors from the source observable', (done) => {
const subject = new Subject<number>();
const outputRef = TestBed.runInInjectionContext(() => outputFromObservable(subject));

const values: number[] = [];
outputRef.subscribe(v => values.push(v));

subject.next(1);
subject.next(2);
expect(values).toEqual([1, 2]);

config.onUnhandledError = err => {
config.onUnhandledError = null;

expect((err as Error).message).toEqual('test error message');
expect(values).toEqual([1, 2]);
done();
};

subject.error(new Error('test error message'));
});
});

0 comments on commit c809069

Please sign in to comment.