/
ThreadPools.java
121 lines (108 loc) · 5.11 KB
/
ThreadPools.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/*
* Licensed to Crate under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership. Crate licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial
* agreement.
*/
package io.crate.execution.support;
import com.google.common.collect.Iterables;
import io.crate.collections.Lists2;
import io.crate.concurrent.CompletableFutures;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
public class ThreadPools {
public static IntSupplier numIdleThreads(ThreadPoolExecutor executor, int numProcessors) {
return () -> Math.min(
Math.max(executor.getMaximumPoolSize() - executor.getActiveCount(), 1),
// poolSize can be > number of processors but we don't want to utilize more threads than numProcessors
// per execution. Thread contention would go up and we're running into RejectedExecutions earlier on
// concurrent queries
numProcessors
);
}
public static Executor fallbackOnRejection(Executor executor) {
return new DirectFallbackExecutor(executor);
}
/**
* Uses up to availableThreads threads to run all suppliers.
* if availableThreads is smaller than the number of suppliers it will run multiple suppliers
* grouped within the available threads.
*
* @param executor executor that is used to execute the callableList
* @param availableThreads A function returning the number of threads which can be utilized
* @param suppliers a collection of callable that should be executed
* @param <T> type of the final result
* @return a future that will return a list of the results of the suppliers
* @throws RejectedExecutionException in case all threads are busy and overloaded.
*/
public static <T> CompletableFuture<List<T>> runWithAvailableThreads(
Executor executor,
IntSupplier availableThreads,
Collection<Supplier<T>> suppliers) throws RejectedExecutionException {
int threadsToUse = availableThreads.getAsInt();
if (threadsToUse < suppliers.size()) {
Iterable<List<Supplier<T>>> partitions = Iterables.partition(suppliers, suppliers.size() / threadsToUse);
ArrayList<CompletableFuture<List<T>>> futures = new ArrayList<>(threadsToUse + 1);
for (List<Supplier<T>> partition : partitions) {
Supplier<List<T>> executePartition = () -> Lists2.copyAndReplace(partition, Supplier::get);
futures.add(CompletableFuture.supplyAsync(executePartition, executor));
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(aVoid -> {
ArrayList<T> finalResult = new ArrayList<>(suppliers.size());
for (CompletableFuture<List<T>> future: futures) {
finalResult.addAll(future.join());
}
return finalResult;
});
} else {
ArrayList<CompletableFuture<T>> futures = new ArrayList<>(suppliers.size());
for (Supplier<T> supplier : suppliers) {
futures.add(CompletableFuture.supplyAsync(supplier, executor));
}
return CompletableFutures.allAsList(futures);
}
}
/**
* Executor that delegates to {@link Executor} or
* runs the command synchronous if the provided executor throws {@link EsRejectedExecutionException}
*/
private static class DirectFallbackExecutor implements Executor {
private final Executor delegate;
DirectFallbackExecutor(Executor delegate) {
this.delegate = delegate;
}
@Override
public void execute(@Nonnull Runnable command) {
try {
delegate.execute(command);
} catch (RejectedExecutionException | EsRejectedExecutionException e) {
command.run();
}
}
}
}