/
AbstractFuture.java
432 lines (369 loc) · 12.3 KB
/
AbstractFuture.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
/*
* Copyright (C) 2015 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.util.concurrent.internal.InternalFutureFailureAccess;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
/** Emulation for AbstractFuture in GWT. */
@SuppressWarnings("nullness") // TODO(b/147136275): Remove once our checker understands & and |.
@ElementTypesAreNonnullByDefault
public abstract class AbstractFuture<V extends @Nullable Object> extends InternalFutureFailureAccess
implements ListenableFuture<V> {
static final boolean GENERATE_CANCELLATION_CAUSES = false;
/**
* Tag interface marking trusted subclasses. This enables some optimizations. The implementation
* of this interface must also be an AbstractFuture and must not override or expose for overriding
* any of the public methods of ListenableFuture.
*/
interface Trusted<V extends @Nullable Object> extends ListenableFuture<V> {}
abstract static class TrustedFuture<V extends @Nullable Object> extends AbstractFuture<V>
implements Trusted<V> {
@CanIgnoreReturnValue
@Override
public final V get() throws InterruptedException, ExecutionException {
return super.get();
}
@CanIgnoreReturnValue
@Override
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return super.get(timeout, unit);
}
@Override
public final boolean isDone() {
return super.isDone();
}
@Override
public final boolean isCancelled() {
return super.isCancelled();
}
@Override
public final void addListener(Runnable listener, Executor executor) {
super.addListener(listener, executor);
}
@CanIgnoreReturnValue
@Override
public final boolean cancel(boolean mayInterruptIfRunning) {
return super.cancel(mayInterruptIfRunning);
}
}
private static final Logger log = Logger.getLogger(AbstractFuture.class.getName());
private State state;
private V value;
private @Nullable Future<? extends V> delegate;
private @Nullable Throwable throwable;
private boolean mayInterruptIfRunning;
private List<Listener> listeners;
protected AbstractFuture() {
state = State.PENDING;
listeners = new ArrayList<Listener>();
}
@CanIgnoreReturnValue
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!state.permitsPublicUserToTransitionTo(State.CANCELLED)) {
return false;
}
this.mayInterruptIfRunning = mayInterruptIfRunning;
state = State.CANCELLED;
notifyAndClearListeners();
if (delegate != null) {
// TODO(lukes): consider adding the StackOverflowError protection from the server version
delegate.cancel(mayInterruptIfRunning);
}
return true;
}
protected void interruptTask() {}
@Override
public boolean isCancelled() {
return state.isCancelled();
}
@Override
public boolean isDone() {
return state.isDone();
}
/*
* ForwardingFluentFuture needs to override those methods, so they are not final.
*/
@CanIgnoreReturnValue
@Override
public V get() throws InterruptedException, ExecutionException {
state.maybeThrowOnGet(throwable);
return value;
}
@CanIgnoreReturnValue
@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
checkNotNull(unit);
return get();
}
@Override
public void addListener(Runnable runnable, Executor executor) {
Listener listener = new Listener(runnable, executor);
if (isDone()) {
listener.execute();
} else {
listeners.add(listener);
}
}
@CanIgnoreReturnValue
protected boolean setException(Throwable throwable) {
checkNotNull(throwable);
if (!state.permitsPublicUserToTransitionTo(State.FAILURE)) {
return false;
}
forceSetException(throwable);
return true;
}
private void forceSetException(Throwable throwable) {
this.throwable = throwable;
this.state = State.FAILURE;
notifyAndClearListeners();
}
@CanIgnoreReturnValue
protected boolean set(V value) {
if (!state.permitsPublicUserToTransitionTo(State.VALUE)) {
return false;
}
forceSet(value);
return true;
}
private void forceSet(V value) {
this.value = value;
this.state = State.VALUE;
notifyAndClearListeners();
}
@CanIgnoreReturnValue
protected boolean setFuture(ListenableFuture<? extends V> future) {
checkNotNull(future);
// If this future is already cancelled, cancel the delegate.
// TODO(cpovirk): Should we do this at the end of the method, as in the server version?
// TODO(cpovirk): Use maybePropagateCancellationTo?
if (isCancelled()) {
future.cancel(mayInterruptIfRunning);
}
if (!state.permitsPublicUserToTransitionTo(State.DELEGATED)) {
return false;
}
state = State.DELEGATED;
this.delegate = future;
future.addListener(new SetFuture(future), directExecutor());
return true;
}
protected final boolean wasInterrupted() {
return mayInterruptIfRunning;
}
private void notifyAndClearListeners() {
afterDone();
// TODO(lukes): consider adding the StackOverflowError protection from the server version
// TODO(cpovirk): consider clearing this.delegate
for (Listener listener : listeners) {
listener.execute();
}
listeners = null;
}
protected void afterDone() {}
@Override
protected final Throwable tryInternalFastPathGetFailure() {
if (this instanceof Trusted) {
return state == State.FAILURE ? throwable : null;
}
return null;
}
final void maybePropagateCancellationTo(@Nullable Future<?> related) {
if (related != null & isCancelled()) {
related.cancel(wasInterrupted());
}
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder().append(super.toString()).append("[status=");
if (isCancelled()) {
builder.append("CANCELLED");
} else if (isDone()) {
addDoneString(builder);
} else {
String pendingDescription;
try {
pendingDescription = pendingToString();
} catch (RuntimeException e) {
// Don't call getMessage or toString() on the exception, in case the exception thrown by the
// subclass is implemented with bugs similar to the subclass.
pendingDescription = "Exception thrown from implementation: " + e.getClass();
}
// The future may complete during or before the call to getPendingToString, so we use null
// as a signal that we should try checking if the future is done again.
if (!isNullOrEmpty(pendingDescription)) {
builder.append("PENDING, info=[").append(pendingDescription).append("]");
} else if (isDone()) {
addDoneString(builder);
} else {
builder.append("PENDING");
}
}
return builder.append("]").toString();
}
/**
* Provide a human-readable explanation of why this future has not yet completed.
*
* @return null if an explanation cannot be provided because the future is done.
*/
@Nullable
String pendingToString() {
if (state == State.DELEGATED) {
return "setFuture=[" + delegate + "]";
}
return null;
}
private void addDoneString(StringBuilder builder) {
try {
V value = getDone(this);
builder.append("SUCCESS, result=[").append(value).append("]");
} catch (ExecutionException e) {
builder.append("FAILURE, cause=[").append(e.getCause()).append("]");
} catch (CancellationException e) {
builder.append("CANCELLED");
} catch (RuntimeException e) {
builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]");
}
}
private enum State {
PENDING {
@Override
boolean isDone() {
return false;
}
@Override
void maybeThrowOnGet(@Nullable Throwable cause) throws ExecutionException {
throw new IllegalStateException("Cannot get() on a pending future.");
}
@Override
boolean permitsPublicUserToTransitionTo(State state) {
return !state.equals(PENDING);
}
},
DELEGATED {
@Override
boolean isDone() {
return false;
}
@Override
void maybeThrowOnGet(@Nullable Throwable cause) throws ExecutionException {
throw new IllegalStateException("Cannot get() on a pending future.");
}
boolean permitsPublicUserToTransitionTo(State state) {
return state.equals(CANCELLED);
}
},
VALUE,
FAILURE {
@Override
void maybeThrowOnGet(@Nullable Throwable cause) throws ExecutionException {
throw new ExecutionException(cause);
}
},
CANCELLED {
@Override
boolean isCancelled() {
return true;
}
@Override
void maybeThrowOnGet(@Nullable Throwable cause) throws ExecutionException {
// TODO(cpovirk): chain in a CancellationException created at the cancel() call?
throw new CancellationException();
}
};
boolean isDone() {
return true;
}
boolean isCancelled() {
return false;
}
void maybeThrowOnGet(@Nullable Throwable cause) throws ExecutionException {}
boolean permitsPublicUserToTransitionTo(State state) {
return false;
}
}
private static final class Listener {
final Runnable command;
final Executor executor;
Listener(Runnable command, Executor executor) {
this.command = checkNotNull(command);
this.executor = checkNotNull(executor);
}
void execute() {
try {
executor.execute(command);
} catch (RuntimeException e) {
log.log(
Level.SEVERE,
"RuntimeException while executing runnable " + command + " with executor " + executor,
e);
}
}
}
private final class SetFuture implements Runnable {
final ListenableFuture<? extends V> delegate;
SetFuture(ListenableFuture<? extends V> delegate) {
this.delegate = delegate;
}
@Override
public void run() {
if (isCancelled()) {
return;
}
if (delegate instanceof AbstractFuture) {
AbstractFuture<? extends V> other = (AbstractFuture<? extends V>) delegate;
value = other.value;
throwable = other.throwable;
// don't copy the mayInterruptIfRunning bit, for consistency with the server, to ensure that
// interruptTask() is called if and only if the bit is true and because we cannot infer the
// interrupt status from non AbstractFuture futures.
state = other.state;
notifyAndClearListeners();
return;
}
/*
* Almost everything in GWT is an AbstractFuture (which is as good as TrustedFuture under
* GWT). But ImmediateFuture and UncheckedThrowingFuture aren't, so we still need this case.
*/
try {
forceSet(getDone(delegate));
} catch (ExecutionException exception) {
forceSetException(exception.getCause());
} catch (CancellationException cancellation) {
cancel(false);
} catch (Throwable t) {
forceSetException(t);
}
}
}
}