Skip to content

Commit

Permalink
Integrate thread-scaling into OrderedLuceneBatchIteratorFactory
Browse files Browse the repository at this point in the history
Another follow up to e488e99
  • Loading branch information
mfussenegger committed Jun 21, 2018
1 parent 5b8d8e1 commit cc6cc5b
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 48 deletions.
Expand Up @@ -121,6 +121,7 @@ public void measureLoadAndConsumeOrderedLuceneBatchIterator(Blackhole blackhole)
Collections.singletonList(createOrderedCollector(indexSearcher, columnName)),
OrderingByPosition.rowOrdering(new int[]{0}, reverseFlags, nullsFirst),
ROW_ACCOUNTING, MoreExecutors.directExecutor(),
() -> 1,
false
);
while (!it.allLoaded()) {
Expand Down
Expand Up @@ -23,7 +23,7 @@
package io.crate.execution.engine.collect.collectors;

import io.crate.breaker.RowAccounting;
import io.crate.concurrent.CompletableFutures;
import io.crate.collections.Lists2;
import io.crate.data.BatchIterator;
import io.crate.data.Row;
import io.crate.execution.engine.distribution.merge.BatchPagingIterator;
Expand All @@ -32,19 +32,18 @@
import io.crate.execution.engine.distribution.merge.PassThroughPagingIterator;
import io.crate.execution.engine.distribution.merge.RamAccountingPageIterator;
import io.crate.execution.engine.distribution.merge.SortedPagingIterator;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import io.crate.execution.support.ThreadPools;
import org.elasticsearch.index.shard.ShardId;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import java.util.function.IntSupplier;

/**
* Factory to create a BatchIterator which is backed by 1 or more {@link OrderedDocCollector}.
Expand All @@ -56,15 +55,17 @@ public static BatchIterator<Row> newInstance(List<OrderedDocCollector> orderedDo
Comparator<Row> rowComparator,
RowAccounting rowAccounting,
Executor executor,
IntSupplier availableThreads,
boolean requiresScroll) {
return new Factory(
orderedDocCollectors, rowComparator, rowAccounting, executor, requiresScroll).create();
orderedDocCollectors, rowComparator, rowAccounting, executor, availableThreads, requiresScroll).create();
}

private static class Factory {

private final List<OrderedDocCollector> orderedDocCollectors;
private final Executor executor;
private final IntSupplier availableThreads;
private final PagingIterator<ShardId, Row> pagingIterator;
private final Map<ShardId, OrderedDocCollector> collectorsByShardId;

Expand All @@ -74,9 +75,11 @@ private static class Factory {
Comparator<Row> rowComparator,
RowAccounting rowAccounting,
Executor executor,
IntSupplier availableThreads,
boolean requiresScroll) {
this.orderedDocCollectors = orderedDocCollectors;
this.executor = executor;
this.availableThreads = availableThreads;
if (orderedDocCollectors.size() == 1) {
pagingIterator = requiresScroll ?
new RamAccountingPageIterator<>(PassThroughPagingIterator.repeatable(), rowAccounting)
Expand Down Expand Up @@ -106,7 +109,11 @@ private boolean tryFetchMore(ShardId shardId) {
return false;
}
if (shardId == null) {
loadFromAllUnExhausted(orderedDocCollectors, executor).whenComplete(this::onNextRows);
ThreadPools.runWithAvailableThreads(
executor,
availableThreads,
Lists2.copyAndReplace(orderedDocCollectors, Function.identity())
).whenComplete(this::onNextRows);
return true;
} else {
return loadFrom(collectorsByShardId.get(shardId));
Expand Down Expand Up @@ -161,19 +168,6 @@ private boolean allExhausted() {
}
}

private static CompletableFuture<List<KeyIterable<ShardId, Row>>> loadFromAllUnExhausted(List<OrderedDocCollector> collectors,
Executor executor) {
List<CompletableFuture<KeyIterable<ShardId, Row>>> futures = new ArrayList<>(collectors.size());
for (OrderedDocCollector collector : collectors.subList(1, collectors.size())) {
try {
futures.add(CompletableFuture.supplyAsync(collector, executor));
} catch (EsRejectedExecutionException | RejectedExecutionException e) {
futures.add(CompletableFuture.completedFuture(collector.get()));
}
}
futures.add(CompletableFuture.completedFuture(collectors.get(0).get()));
return CompletableFutures.allAsList(futures);
}

private static Map<ShardId, OrderedDocCollector> toMapByShardId(List<OrderedDocCollector> collectors) {
Map<ShardId, OrderedDocCollector> collectorsByShardId = new HashMap<>(collectors.size());
Expand Down
Expand Up @@ -100,8 +100,7 @@ public CompletableFuture<Long> count(Map<String, ? extends Collection<Integer>>
CompletableFuture<List<Long>> futurePartialCounts = ThreadPools.runWithAvailableThreads(
executor,
ThreadPools.numIdleThreads(executor, numProcessors),
suppliers,
mergeFunction
suppliers
);
return futurePartialCounts.thenApply(mergeFunction);
}
Expand Down
Expand Up @@ -391,6 +391,7 @@ private CrateCollector createMultiShardScoreDocCollector(RoutedCollectPhase coll
),
new RowAccountingWithEstimators(columnTypes, collectTask.queryPhaseRamAccountingContext()),
executor,
availableThreads,
consumer.requiresScroll()
),
consumer
Expand Down
Expand Up @@ -204,8 +204,7 @@ private void doFetch(FetchTask fetchTask,
ThreadPools.runWithAvailableThreads(
executor,
ThreadPools.numIdleThreads(executor, numProcessors),
collectors,
items -> null
collectors
);
}

Expand Down
43 changes: 21 additions & 22 deletions sql/src/main/java/io/crate/execution/support/ThreadPools.java
Expand Up @@ -23,6 +23,7 @@
package io.crate.execution.support;

import com.google.common.collect.Iterables;
import io.crate.collections.Lists2;
import io.crate.concurrent.CompletableFutures;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

Expand All @@ -34,7 +35,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.Supplier;

Expand All @@ -55,47 +55,46 @@ public static Executor fallbackOnRejection(Executor executor) {
}

/**
* runs each runnable of the runnableCollection in it's own thread unless there aren't enough threads available.
* In that case it will partition the runnableCollection to match the number of available threads.
* Uses up to availableThreads threads to run all suppliers.
* if availableThreads is smaller than the number of suppliers it will run multiple suppliers
* grouped within the available threads.
*
* @param executor executor that is used to execute the callableList
* @param availableThreads A function returning the number of threads which can be utilized
* @param suppliers a collection of callable that should be executed
* @param mergeFunction function that will be applied to merge the results of multiple callable in case that they are
* executed together if the threadPool is exhausted
* @param <T> type of the final result
* @return a future that will return a list of the results of the callableList
* @return a future that will return a list of the results of the suppliers
* @throws RejectedExecutionException in case all threads are busy and overloaded.
*/
public static <T> CompletableFuture<List<T>> runWithAvailableThreads(
Executor executor,
IntSupplier availableThreads,
Collection<Supplier<T>> suppliers,
final Function<List<T>, T> mergeFunction) throws RejectedExecutionException {
Collection<Supplier<T>> suppliers) throws RejectedExecutionException {

ArrayList<CompletableFuture<T>> futures;
int threadsToUse = availableThreads.getAsInt();
if (threadsToUse < suppliers.size()) {
Iterable<List<Supplier<T>>> partition = Iterables.partition(suppliers, suppliers.size() / threadsToUse);
Iterable<List<Supplier<T>>> partitions = Iterables.partition(suppliers, suppliers.size() / threadsToUse);

futures = new ArrayList<>(threadsToUse + 1);
for (final List<Supplier<T>> callableList : partition) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
ArrayList<T> results = new ArrayList<>(callableList.size());
for (Supplier<T> supplier : callableList) {
results.add(supplier.get());
}
return mergeFunction.apply(results);
}, executor);
futures.add(future);
ArrayList<CompletableFuture<List<T>>> futures = new ArrayList<>(threadsToUse + 1);
for (List<Supplier<T>> partition : partitions) {
Supplier<List<T>> executePartition = () -> Lists2.copyAndReplace(partition, Supplier::get);
futures.add(CompletableFuture.supplyAsync(executePartition, executor));
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(aVoid -> {
ArrayList<T> finalResult = new ArrayList<>(suppliers.size());
for (CompletableFuture<List<T>> future: futures) {
finalResult.addAll(future.join());
}
return finalResult;
});
} else {
futures = new ArrayList<>(suppliers.size());
ArrayList<CompletableFuture<T>> futures = new ArrayList<>(suppliers.size());
for (Supplier<T> supplier : suppliers) {
futures.add(CompletableFuture.supplyAsync(supplier, executor));
}
return CompletableFutures.allAsList(futures);
}
return CompletableFutures.allAsList(futures);
}

/**
Expand Down
Expand Up @@ -128,7 +128,9 @@ public void testOrderedLuceneBatchIterator() throws Exception {
return OrderedLuceneBatchIteratorFactory.newInstance(
Arrays.asList(collector1, collector2),
OrderingByPosition.rowOrdering(new int[]{0}, reverseFlags, nullsFirst),
ROW_ACCOUNTING, MoreExecutors.directExecutor(),
ROW_ACCOUNTING,
MoreExecutors.directExecutor(),
() -> 1,
true
);
}
Expand All @@ -146,7 +148,9 @@ public void testSingleCollectorOrderedLuceneBatchIteratorTripsCircuitBreaker() t
BatchIterator<Row> rowBatchIterator = OrderedLuceneBatchIteratorFactory.newInstance(
Arrays.asList(createOrderedCollector(searcher1, 1)),
OrderingByPosition.rowOrdering(new int[]{0}, reverseFlags, nullsFirst),
rowAccounting, MoreExecutors.directExecutor(),
rowAccounting,
MoreExecutors.directExecutor(),
() -> 2,
true
);

Expand All @@ -166,6 +170,7 @@ public void testOrderedLuceneBatchIteratorWithMultipleCollectorsTripsCircuitBrea
Arrays.asList(collector1, collector2),
OrderingByPosition.rowOrdering(new int[]{0}, reverseFlags, nullsFirst),
rowAccounting, MoreExecutors.directExecutor(),
() -> 1,
true
);

Expand Down

0 comments on commit cc6cc5b

Please sign in to comment.