/
to_observable.ts
69 lines (63 loc) 路 1.68 KB
/
to_observable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import {
assertInInjectionContext,
DestroyRef,
effect,
inject,
Injector,
Signal,
untracked,
} from '@angular/core';
import {Observable, ReplaySubject} from 'rxjs';
/**
* Options for `toObservable`.
*
* @developerPreview
*/
export interface ToObservableOptions {
/**
* The `Injector` to use when creating the underlying `effect` which watches the signal.
*
* If this isn't specified, the current [injection context](guide/di/dependency-injection-context)
* will be used.
*/
injector?: Injector;
}
/**
* Exposes the value of an Angular `Signal` as an RxJS `Observable`.
*
* The signal's value will be propagated into the `Observable`'s subscribers using an `effect`.
*
* `toObservable` must be called in an injection context unless an injector is provided via options.
*
* @developerPreview
*/
export function toObservable<T>(source: Signal<T>, options?: ToObservableOptions): Observable<T> {
!options?.injector && assertInInjectionContext(toObservable);
const injector = options?.injector ?? inject(Injector);
const subject = new ReplaySubject<T>(1);
const watcher = effect(
() => {
let value: T;
try {
value = source();
} catch (err) {
untracked(() => subject.error(err));
return;
}
untracked(() => subject.next(value));
},
{injector, manualCleanup: true},
);
injector.get(DestroyRef).onDestroy(() => {
watcher.destroy();
subject.complete();
});
return subject.asObservable();
}