-
Notifications
You must be signed in to change notification settings - Fork 23
/
ManagedExecutor.java
305 lines (290 loc) · 14.4 KB
/
ManagedExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
/*
* Copyright (c) 2018 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.microprofile.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.eclipse.microprofile.concurrent.spi.ConcurrencyProvider;
/**
* <p>A container-managed executor service that creates instances of CompletableFuture,
* which it backs as the default asynchronous execution facility, both for the
* CompletableFuture itself as well as all dependent stages created from it,
* as well as all dependent stages created from those, and so on.</p>
*
* <p>Example usage:</p>
* <pre>
* <code>@Inject</code> ManagedExecutor executor;
* ...
* CompletableFuture<Integer> future = executor
* .supplyAsync(supplier)
* .thenApplyAsync(function1)
* .thenApply(function2)
* ...
* </pre>
*
* <p>This specification allows for managed executors that do not capture and propagate thread context,
* which can offer better performance. If thread context propagation is desired only for specific stages,
* the <code>ThreadContext.contextual*</code> API methods can be used to propagate thread context to
* individual actions.</p>
*
* <p>Example of single action with context propagation:</p>
* <pre>
* CompletableFuture<?> future = executor
* .runAsync(runnable1)
* .thenRun(threadContext.contextualRunnable(runnable2))
* .thenRunAsync(runnable3)
* ...
* </pre>
*
* <p>For managed executors that are defined as capturing and propagating thread context,
* it must be done in a consistent manner. Thread context is captured from the thread that creates
* a completion stage and is applied to the thread that runs the action, being removed afterward.
* When dependent stages are created from the completion stage, and likewise from any dependent stages
* created from those, and so on, thread context is captured from the thread that creates the dependent
* stage. This guarantees that the action performed by each stage always runs under the thread context
* of the code that creates the stage. When applied to the ExecutorService methods,
* <code>execute</code>, <code>invokeAll</code>, <code>invokeAny</code>, <code>submit</code>,
* thread context is captured from the thread invoking the request and propagated to thread that runs
* the task, being removed afterward.</p>
*
* <p>This interface is intentionally kept compatible with ManagedExecutorService,
* with the hope that its methods might one day be contributed to that specification.</p>
*
* <p>Managed executors are managed by the container, not be the user. Therefore, all
* life cycle methods must raise IllegalStateException. This includes:
* awaitTermination, isShutdown, isTerminated, shutdown, shutdownNow</p>
*/
public interface ManagedExecutor extends ExecutorService {
/**
* Creates a new {@link Builder} instance.
*
* @return a new {@link Builder} instance.
*/
public static Builder builder() {
return ConcurrencyProvider.instance().getConcurrencyManager().newManagedExecutorBuilder();
}
/**
* <p>Builder for {@link ManagedExecutor} instances.</p>
*
* <p>Example usage:</p>
* <pre><code> ManagedExecutor executor = ManagedExecutor.builder()
* .maxAsync(5)
* .maxQueued(20)
* .propagated(ThreadContext.SECURITY)
* .build();
* ...
* </code></pre>
*/
public interface Builder {
/**
* <p>Builds a new {@link ManagedExecutor} with the configuration
* that this builder represents as of the point in time when this method
* is invoked.</p>
*
* <p>After {@link #build} is invoked, the builder instance retains its
* configuration and may be further updated to represent different
* configurations and build additional {@link ManagedExecutor}
* instances.</p>
*
* <p>All created instances of {@link ManagedExecutor} are destroyed
* when the application is stopped. The container automatically shuts down these
* managed executors and cancels their remaining actions/tasks.</p>
*
* @return new instance of {@link ManagedExecutor}.
* @throws IllegalStateException for any of the following error conditions
* <ul>
* <li>if one or more of the same context types appear in both the
* {@link #cleared} set and the {@link #propagated} set</li>
* <li>if a thread context type that is configured to be
* {@link #cleared} or {@link #propagated} is unavailable</li>
* <li>if more than one provider provides the same thread context
* {@link org.eclipse.microprofile.concurrent.spi.ThreadContextProvider#getThreadContextType type}
* </li>
* </ul>
*/
ManagedExecutor build();
/**
* <p>Defines the set of thread context types to clear from the thread
* where the action or task executes. The previous context is resumed
* on the thread after the action or task ends.</p>
*
* <p>This set replaces the <code>cleared</code> set that was previously
* specified on the builder instance, if any.</p>
*
* <p>The default set of cleared thread context types is
* {@link ThreadContext#TRANSACTION}, which means that a transaction
* is not active on the thread when the action or task runs, such
* that each action or task is able to independently start and end
* its own transactional work.</p>
*
* <p>{@link ThreadContext#ALL_REMAINING} is automatically appended to the
* set of cleared context if the {@link #propagated} set does not include
* {@link ThreadContext#ALL_REMAINING}.</p>
*
* <p>Constants for specifying some of the core context types are provided
* on {@link ThreadContext}. Other thread context types must be defined
* by the specification that defines the context type or by a related
* MicroProfile specification.</p>
*
* @param types types of thread context to clear from threads that run
* actions and tasks.
* @return the same builder instance upon which this method is invoked.
*/
Builder cleared(String... types);
/**
* <p>Defines the set of thread context types to capture from the thread
* that creates a dependent stage (or that submits a task) and which to
* propagate to the thread where the action or task executes.</p>
*
* <p>This set replaces the <code>propagated</code> set that was
* previously specified on the builder instance, if any.</p>
*
* <p>The default set of propagated thread context types is
* {@link ThreadContext#ALL_REMAINING}, which includes all available
* thread context types that support capture and propagation to other
* threads, except for those that are explicitly {@link cleared},
* which, by default is {@link ThreadContext#TRANSACTION} context,
* in which case is suspended from the thread that runs the action or
* task.</p>
*
* <p>Constants for specifying some of the core context types are provided
* on {@link ThreadContext}. Other thread context types must be defined
* by the specification that defines the context type or by a related
* MicroProfile specification.</p>
*
* <p>Thread context types which are not otherwise included in this set
* are cleared from the thread of execution for the duration of the
* action or task.</p>
*
* @param types types of thread context to capture and propagate.
* @return the same builder instance upon which this method is invoked.
*/
Builder propagated(String... types);
/**
* <p>Establishes an upper bound on the number of async completion stage
* actions and async executor tasks that can be running at any given point
* in time. There is no guarantee that async actions or tasks will start
* running immediately, even when the <code>maxAsync</code> constraint has
* not get been reached. Async actions and tasks remain queued until
* the <code>ManagedExecutor</code> starts executing them.</p>
*
* <p>The default value of <code>-1</code> indicates no upper bound,
* although practically, resource constraints of the system will apply.</p>
*
* @param max upper bound on async completion stage actions and executor tasks.
* @return the same builder instance upon which this method is invoked.
* @throws IllegalArgumentException if max is 0 or less than -1.
*/
Builder maxAsync(int max);
/**
* <p>Establishes an upper bound on the number of async actions and async tasks
* that can be queued up for execution. Async actions and tasks are rejected
* if no space in the queue is available to accept them.</p>
*
* <p>The default value of <code>-1</code> indicates no upper bound,
* although practically, resource constraints of the system will apply.</p>
*
* @param max upper bound on async actions and tasks that can be queued.
* @return the same builder instance upon which this method is invoked.
* @throws IllegalArgumentException if max is 0 or less than -1.
*/
Builder maxQueued(int max);
}
/**
* <p>Returns a new CompletableFuture that is already completed with the specified value.</p>
*
* <p>This executor is the default asynchronous execution facility for the new completion stage
* that is returned by this method and all dependent stages that are created from it,
* and all dependent stages that are created from those, and so forth.</p>
*
* @param <U> result type of the completion stage.
* @return the new completion stage.
*/
<U> CompletableFuture<U> completedFuture(U value);
/**
* <p>Returns a new CompletionStage that is already completed with the specified value.</p>
*
* <p>This executor is the default asynchronous execution facility for the new completion stage
* that is returned by this method and all dependent stages that are created from it,
* and all dependent stages that are created from those, and so forth.</p>
*
* @param <U> result type of the completion stage.
* @return the new completion stage.
*/
<U> CompletionStage<U> completedStage(U value);
/**
* <p>Returns a new CompletableFuture that is already exceptionally completed with the specified Throwable.</p>
*
* <p>This executor is the default asynchronous execution facility for the new completion stage
* that is returned by this method and all dependent stages that are created from it,
* and all dependent stages that are created from those, and so forth.</p>
*
* @param <U> result type of the completion stage.
* @return the new completion stage.
*/
<U> CompletableFuture<U> failedFuture(Throwable ex);
/**
* <p>Returns a new CompletionStage that is already exceptionally completed with the specified Throwable.</p>
*
* <p>This executor is the default asynchronous execution facility for the new completion stage
* that is returned by this method and all dependent stages that are created from it,
* and all dependent stages that are created from those, and so forth.</p>
*
* @param <U> result type of the completion stage.
* @return the new completion stage.
*/
<U> CompletionStage<U> failedStage(Throwable ex);
/**
* <p>Returns a new incomplete <code>CompletableFuture</code>.</p>
*
* <p>This executor is the default asynchronous execution facility for the new completion stage
* that is returned by this method and all dependent stages that are created from it,
* and all dependent stages that are created from those, and so forth.</p>
*
* @param <U> result type of the completion stage.
* @return the new completion stage.
*/
<U> CompletableFuture<U> newIncompleteFuture();
/**
* <p>Returns a new CompletableFuture that is completed by a task running in this executor
* after it runs the given action.</p>
*
* <p>This executor is the default asynchronous execution facility for the new completion stage
* that is returned by this method and all dependent stages that are created from it,
* and all dependent stages that are created from those, and so forth.</p>
*
* @param runnable the action to run before completing the returned completion stage.
* @return the new completion stage.
*/
CompletableFuture<Void> runAsync(Runnable runnable);
/**
* <p>Returns a new CompletableFuture that is completed by a task running in this executor
* after it runs the given action.</p>
*
* <p>This executor is the default asynchronous execution facility for the new completion stage
* that is returned by this method and all dependent stages that are created from it,
* and all dependent stages that are created from those, and so forth.</p>
*
* @param <U> result type of the supplier and completion stage.
* @param supplier an action returning the value to be used to complete the returned completion stage.
* @return the new completion stage.
*/
<U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
}