/
async_pipe.ts
151 lines (131 loc) Β· 4.5 KB
/
async_pipe.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import {ChangeDetectorRef, OnDestroy, Pipe, WrappedValue} from '@angular/core';
import {EventEmitter, Observable} from '../facade/async';
import {isBlank, isPresent, isPromise} from '../facade/lang';
import {InvalidPipeArgumentException} from './invalid_pipe_argument_exception';
interface SubscriptionStrategy {
createSubscription(async: any, updateLatestValue: any): any;
dispose(subscription: any): void;
onDestroy(subscription: any): void;
}
class ObservableStrategy implements SubscriptionStrategy {
createSubscription(async: any, updateLatestValue: any): any {
return async.subscribe({next: updateLatestValue, error: (e: any) => { throw e; }});
}
dispose(subscription: any): void { subscription.unsubscribe(); }
onDestroy(subscription: any): void { subscription.unsubscribe(); }
}
class PromiseStrategy implements SubscriptionStrategy {
createSubscription(async: Promise<any>, updateLatestValue: (v: any) => any): any {
return async.then(updateLatestValue, e => { throw e; });
}
dispose(subscription: any): void {}
onDestroy(subscription: any): void {}
}
var _promiseStrategy = new PromiseStrategy();
var _observableStrategy = new ObservableStrategy();
var __unused: Promise<any>; // avoid unused import when Promise union types are erased
/**
* The `async` pipe subscribes to an `Observable` or `Promise` and returns the latest value it has
* emitted.
* When a new value is emitted, the `async` pipe marks the component to be checked for changes.
* When the component gets destroyed, the `async` pipe unsubscribes automatically to avoid
* potential memory leaks.
*
* ## Usage
*
* object | async
*
* where `object` is of type `Observable` or of type `Promise`.
*
* ## Examples
*
* This example binds a `Promise` to the view. Clicking the `Resolve` button resolves the
* promise.
*
* {@example core/pipes/ts/async_pipe/async_pipe_example.ts region='AsyncPipePromise'}
*
* It's also possible to use `async` with Observables. The example below binds the `time` Observable
* to the view. Every 500ms, the `time` Observable updates the view with the current time.
*
* {@example core/pipes/ts/async_pipe/async_pipe_example.ts region='AsyncPipeObservable'}
*
* @stable
*/
@Pipe({name: 'async', pure: false})
export class AsyncPipe implements OnDestroy {
/** @internal */
_latestValue: Object = null;
/** @internal */
_latestReturnedValue: Object = null;
/** @internal */
_subscription: Object = null;
/** @internal */
_obj: Observable<any>|Promise<any>|EventEmitter<any> = null;
/** @internal */
_ref: ChangeDetectorRef;
private _strategy: SubscriptionStrategy = null;
constructor(_ref: ChangeDetectorRef) { this._ref = _ref; }
ngOnDestroy(): void {
if (isPresent(this._subscription)) {
this._dispose();
}
}
transform(obj: Observable<any>|Promise<any>|EventEmitter<any>): any {
if (isBlank(this._obj)) {
if (isPresent(obj)) {
this._subscribe(obj);
}
this._latestReturnedValue = this._latestValue;
return this._latestValue;
}
if (obj !== this._obj) {
this._dispose();
return this.transform(obj);
}
if (this._latestValue === this._latestReturnedValue) {
return this._latestReturnedValue;
} else {
this._latestReturnedValue = this._latestValue;
return WrappedValue.wrap(this._latestValue);
}
}
/** @internal */
_subscribe(obj: Observable<any>|Promise<any>|EventEmitter<any>): void {
this._obj = obj;
this._strategy = this._selectStrategy(obj);
this._subscription = this._strategy.createSubscription(
obj, (value: Object) => this._updateLatestValue(obj, value));
}
/** @internal */
_selectStrategy(obj: Observable<any>|Promise<any>|EventEmitter<any>): any {
if (isPromise(obj)) {
return _promiseStrategy;
} else if ((<any>obj).subscribe) {
return _observableStrategy;
} else {
throw new InvalidPipeArgumentException(AsyncPipe, obj);
}
}
/** @internal */
_dispose(): void {
this._strategy.dispose(this._subscription);
this._latestValue = null;
this._latestReturnedValue = null;
this._subscription = null;
this._obj = null;
}
/** @internal */
_updateLatestValue(async: any, value: Object) {
if (async === this._obj) {
this._latestValue = value;
this._ref.markForCheck();
}
}
}