/
AsyncExecution.java
207 lines (186 loc) · 7.51 KB
/
AsyncExecution.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package net.jodah.failsafe;
import net.jodah.failsafe.internal.util.Assert;
import net.jodah.failsafe.util.concurrent.Scheduler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* Tracks asynchronous executions and allows retries to be scheduled according to a {@link RetryPolicy}. May be
* explicitly completed or made to retry.
*
* @author Jonathan Halterman
*/
@SuppressWarnings("WeakerAccess")
public final class AsyncExecution extends AbstractExecution {
private Supplier<CompletableFuture<ExecutionResult>> executionSupplier;
final FailsafeFuture<Object> future;
final Scheduler scheduler;
private volatile boolean completeCalled;
private volatile boolean retryCalled;
@SuppressWarnings("unchecked")
<T> AsyncExecution(Scheduler scheduler, FailsafeFuture<T> future, FailsafeExecutor<?> executor) {
super((FailsafeExecutor<Object>) executor);
this.scheduler = scheduler;
this.future = (FailsafeFuture<Object>) future;
}
/**
* Completes the execution and the associated {@code CompletableFuture}.
*
* @throws IllegalStateException if the execution is already complete
*/
public void complete() {
postExecute(ExecutionResult.NONE);
}
/**
* Attempts to complete the execution and the associated {@code CompletableFuture} with the {@code result}. Returns
* true on success, else false if completion failed and the execution should be retried via {@link #retry()}.
*
* @throws IllegalStateException if the execution is already complete
*/
public boolean complete(Object result) {
return postExecute(new ExecutionResult(result, null));
}
/**
* Attempts to complete the execution and the associated {@code CompletableFuture} with the {@code result} and {@code
* failure}. Returns true on success, else false if completion failed and the execution should be retried via {@link
* #retry()}.
* <p>
* Note: the execution may be completed even when the {@code failure} is not {@code null}, such as when the
* RetryPolicy does not allow retries for the {@code failure}.
*
* @throws IllegalStateException if the execution is already complete
*/
public boolean complete(Object result, Throwable failure) {
return postExecute(new ExecutionResult(result, failure));
}
/**
* Records an execution and returns true if a retry has been scheduled for else returns returns false and completes
* the execution and associated {@code CompletableFuture}.
*
* @throws IllegalStateException if a retry method has already been called or the execution is already complete
*/
public boolean retry() {
return retryFor(lastResult, lastFailure);
}
/**
* Records an execution and returns true if a retry has been scheduled for the {@code result}, else returns false and
* marks the execution and associated {@code CompletableFuture} as complete.
*
* @throws IllegalStateException if a retry method has already been called or the execution is already complete
*/
public boolean retryFor(Object result) {
return retryFor(result, null);
}
/**
* Records an execution and returns true if a retry has been scheduled for the {@code result} or {@code failure}, else
* returns false and marks the execution and associated {@code CompletableFuture} as complete.
*
* @throws IllegalStateException if a retry method has already been called or the execution is already complete
*/
public boolean retryFor(Object result, Throwable failure) {
Assert.state(!retryCalled, "Retry has already been called");
retryCalled = true;
return !completeOrHandle(result, failure);
}
/**
* Records an execution and returns true if a retry has been scheduled for the {@code failure}, else returns false and
* marks the execution and associated {@code CompletableFuture} as complete.
*
* @throws NullPointerException if {@code failure} is null
* @throws IllegalStateException if a retry method has already been called or the execution is already complete
*/
public boolean retryOn(Throwable failure) {
Assert.notNull(failure, "failure");
return retryFor(null, failure);
}
/**
* Prepares for an execution by resetting internal flags.
*/
void preExecute() {
completeCalled = false;
retryCalled = false;
}
/**
* Attempts to complete the parent execution, calls failure handlers, and completes the future if needed. Runs
* synchrnously since a concrete result is needed.
*
* @throws IllegalStateException if the execution is already complete
*/
@Override
boolean postExecute(ExecutionResult result) {
synchronized (future) {
if (!completeCalled) {
if (super.postExecute(result))
complete(result, null);
completeCalled = true;
}
return completed;
}
}
/**
* Performs an asynchronous execution.
*/
void executeAsync(Supplier<CompletableFuture<ExecutionResult>> supplier) {
for (PolicyExecutor<Policy<Object>> policyExecutor : policyExecutors)
supplier = policyExecutor.supplyAsync(supplier, scheduler, future);
supplier = Functions.makeAsync(supplier, scheduler, future);
supplier.get().whenComplete(this::complete);
}
/**
* Performs an asynchronous execution where the execution must be manually completed via the provided AsyncExecution.
*/
@SuppressWarnings("unchecked")
void executeAsyncExecution(Supplier<CompletableFuture<ExecutionResult>> supplier) {
executionSupplier = supplier;
future.inject((Future) scheduler.schedule(supplier::get, 0, TimeUnit.NANOSECONDS));
}
/**
* Attempts to complete the execution else handle according to the configured policies. Returns {@code true} if the
* execution was completed, else false which indicates the result was handled asynchronously and may have triggered a
* retry.
*
* @throws IllegalStateException if the execution is already complete
*/
boolean completeOrHandle(Object result, Throwable failure) {
synchronized (future) {
ExecutionResult er = new ExecutionResult(result, failure);
if (!completeCalled)
record(er);
completeCalled = true;
Supplier<CompletableFuture<ExecutionResult>> supplier = Functions.supplyOnce(
CompletableFuture.completedFuture(er), executionSupplier);
for (PolicyExecutor<Policy<Object>> policyExecutor : policyExecutors)
supplier = policyExecutor.supplyAsync(supplier, scheduler, future);
supplier.get().whenComplete(this::complete);
return completed;
}
}
private void complete(ExecutionResult result, Throwable error) {
if (result == null && error == null)
return;
completed = true;
if (!future.isDone() && !future.isCancelled()) {
if (result != null) {
future.complete(result.getResult(), result.getFailure());
executor.handleComplete(result, this);
} else
future.complete(null, error);
}
}
}