-
Notifications
You must be signed in to change notification settings - Fork 47
/
cancelable_operation.dart
216 lines (184 loc) · 7.07 KB
/
cancelable_operation.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:async/async.dart';
import 'utils.dart';
/// An asynchronous operation that can be cancelled.
///
/// The value of this operation is exposed as [value]. When this operation is
/// cancelled, [value] won't complete either successfully or with an error. If
/// [value] has already completed, cancelling the operation does nothing.
class CancelableOperation<T> {
/// The completer that produced this operation.
///
/// This is canceled when [cancel] is called.
final CancelableCompleter<T> _completer;
CancelableOperation._(this._completer);
/// Creates a [CancelableOperation] wrapping [inner].
///
/// When this operation is canceled, [onCancel] will be called and any value
/// or error produced by [inner] will be discarded. If [onCancel] returns a
/// [Future], it will be forwarded to [cancel].
///
/// [onCancel] will be called synchronously when the operation is canceled.
/// It's guaranteed to only be called once.
factory CancelableOperation.fromFuture(Future<T> inner,
{FutureOr onCancel()}) {
var completer = CancelableCompleter<T>(onCancel: onCancel);
completer.complete(inner);
return completer.operation;
}
/// The value returned by the operation.
Future<T> get value => _completer._inner.future;
/// Creates a [Stream] containing the result of this operation.
///
/// This is like `value.asStream()`, but if a subscription to the stream is
/// canceled, this is as well.
Stream<T> asStream() {
var controller =
StreamController<T>(sync: true, onCancel: _completer._cancel);
value.then((value) {
controller.add(value);
controller.close();
}, onError: (error, stackTrace) {
controller.addError(error, stackTrace);
controller.close();
});
return controller.stream;
}
/// Creates a [Future] that completes when this operation completes *or* when
/// it's cancelled.
///
/// If this operation completes, this completes to the same result as [value].
/// If this operation is cancelled, the returned future waits for the future
/// returned by [cancel], then completes to [cancellationValue].
Future<T> valueOrCancellation([T cancellationValue]) {
var completer = Completer<T>.sync();
value.then((result) => completer.complete(result),
onError: completer.completeError);
_completer._cancelMemo.future.then((_) {
completer.complete(cancellationValue);
}, onError: completer.completeError);
return completer.future;
}
/// Registers callbacks to be called when this operation completes.
///
/// [onValue] and [onError] behave in the same way as [Future.then].
///
/// If [onCancel] is provided, and this operation is canceled, the [onCancel]
/// callback is called and the returned operation completes with the result.
///
/// If [onCancel] is not given, and this operation is canceled, then the
/// returned operation is canceled.
///
/// If [propagateCancel] is `true` and the returned operation is canceled then
/// this operation is canceled. The default is `false`.
CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
{FutureOr<R> Function(Object, StackTrace) onError,
FutureOr<R> Function() onCancel,
bool propagateCancel = false}) {
final completer =
CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
valueOrCancellation().then((T result) {
if (!completer.isCanceled) {
if (isCompleted) {
completer.complete(Future.sync(() => onValue(result)));
} else if (onCancel != null) {
completer.complete(Future.sync(onCancel));
} else {
completer._cancel();
}
}
}, onError: (error, stackTrace) {
if (!completer.isCanceled) {
if (onError != null) {
completer.complete(Future.sync(() => onError(error, stackTrace)));
} else {
completer.completeError(error, stackTrace);
}
}
});
return completer.operation;
}
/// Cancels this operation.
///
/// This returns the [Future] returned by the [CancelableCompleter]'s
/// `onCancel` callback. Unlike [Stream.cancel], it never returns `null`.
Future cancel() => _completer._cancel();
/// Whether this operation has been canceled before it completed.
bool get isCanceled => _completer.isCanceled;
/// Whether this operation completed before being canceled.
bool get isCompleted => _completer.isCompleted;
}
/// A completer for a [CancelableOperation].
class CancelableCompleter<T> {
/// The completer for the wrapped future.
final Completer<T> _inner;
/// The callback to call if the future is canceled.
final FutureOrCallback _onCancel;
/// Creates a new completer for a [CancelableOperation].
///
/// When the future operation canceled, as long as the completer hasn't yet
/// completed, [onCancel] is called. If [onCancel] returns a [Future], it's
/// forwarded to [CancelableOperation.cancel].
///
/// [onCancel] will be called synchronously when the operation is canceled.
/// It's guaranteed to only be called once.
CancelableCompleter({FutureOr onCancel()})
: _onCancel = onCancel,
_inner = Completer<T>() {
_operation = CancelableOperation<T>._(this);
}
/// The operation controlled by this completer.
CancelableOperation<T> get operation => _operation;
CancelableOperation<T> _operation;
/// Whether the completer has completed.
bool get isCompleted => _isCompleted;
bool _isCompleted = false;
/// Whether the completer was canceled before being completed.
bool get isCanceled => _isCanceled;
bool _isCanceled = false;
/// The memoizer for [_cancel].
final _cancelMemo = AsyncMemoizer();
/// Completes [operation] to [value].
///
/// If [value] is a [Future], this will complete to the result of that
/// [Future] once it completes.
void complete([value]) {
if (_isCompleted) throw StateError("Operation already completed");
_isCompleted = true;
if (value is! Future) {
if (_isCanceled) return;
_inner.complete(value);
return;
}
if (_isCanceled) {
// Make sure errors from [value] aren't top-leveled.
value.catchError((_) {});
return;
}
value.then((result) {
if (_isCanceled) return;
_inner.complete(result);
}, onError: (error, stackTrace) {
if (_isCanceled) return;
_inner.completeError(error, stackTrace);
});
}
/// Completes [operation] to [error].
void completeError(Object error, [StackTrace stackTrace]) {
if (_isCompleted) throw StateError("Operation already completed");
_isCompleted = true;
if (_isCanceled) return;
_inner.completeError(error, stackTrace);
}
/// Cancel the completer.
Future _cancel() {
if (_inner.isCompleted) return Future.value();
return _cancelMemo.runOnce(() {
_isCanceled = true;
if (_onCancel != null) return _onCancel();
});
}
}