Skip to content

Commit

Permalink
generify runFetchThreaded/runCollectThreaded and clean up interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Apr 10, 2015
1 parent e94d970 commit 5acb3e8
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 57 deletions.
3 changes: 1 addition & 2 deletions sql/src/main/java/io/crate/operation/Input.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@

public interface Input<T> {

public T value();

T value();
}
4 changes: 2 additions & 2 deletions sql/src/main/java/io/crate/operation/PageConsumeListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public interface PageConsumeListener {

public void needMore();
void needMore();

public void finish();
void finish();
}
6 changes: 3 additions & 3 deletions sql/src/main/java/io/crate/operation/PageDownstream.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

public interface PageDownstream {

public void nextPage(BucketPage page, PageConsumeListener listener);
void nextPage(BucketPage page, PageConsumeListener listener);

public void finish();
void finish();

public void fail(Throwable t);
void fail(Throwable t);
}
2 changes: 1 addition & 1 deletion sql/src/main/java/io/crate/operation/RowDownstream.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ public interface RowDownstream {
* @param upstream the upstream to be registered
* @return A downstream handle to push rows to.
*/
public RowDownstreamHandle registerUpstream(RowUpstream upstream);
RowDownstreamHandle registerUpstream(RowUpstream upstream);

}
6 changes: 3 additions & 3 deletions sql/src/main/java/io/crate/operation/RowDownstreamHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ public interface RowDownstreamHandle {
*
* @return false if the downstream does not need any more rows, true otherwise.
*/
public boolean setNextRow(Row row);
boolean setNextRow(Row row);

/**
* Called from the upstream to indicate that all rows are sent.
*/
public void finish();
void finish();

/**
* Is called from the upstream in case of a failure.
*/
public void fail(Throwable throwable);
void fail(Throwable throwable);

}
63 changes: 63 additions & 0 deletions sql/src/main/java/io/crate/operation/ThreadPools.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.operation;

import com.google.common.collect.Iterables;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

public class ThreadPools {

/**
* runs each runnable of the runnableCollection in it's own thread unless there aren't enough threads available.
* In that case it will partition the runnableCollection to match the number of available threads.
*
* @throws RejectedExecutionException in case all threads are busy and overloaded.
*/
public static void runWithAvailableThreads(ThreadPoolExecutor executor,
int poolSize,
Collection<Runnable> runnableCollection) throws RejectedExecutionException {
int availableThreads = Math.max(poolSize - executor.getActiveCount(), 2);
if (availableThreads < runnableCollection.size()) {
Iterable<List<Runnable>> partition = Iterables.partition(runnableCollection,
runnableCollection.size() / availableThreads);

for (final List<Runnable> runnableList : partition) {
executor.execute(new Runnable() {
@Override
public void run() {
for (Runnable runnable : runnableList) {
runnable.run();
}
}
});
}
} else {
for (Runnable runnable : runnableCollection) {
executor.execute(runnable);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

package io.crate.operation.collect;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -37,6 +38,7 @@
import io.crate.metadata.Functions;
import io.crate.metadata.ReferenceResolver;
import io.crate.operation.ImplementationSymbolVisitor;
import io.crate.operation.ThreadPools;
import io.crate.operation.collect.files.FileCollectInputSymbolVisitor;
import io.crate.operation.collect.files.FileInputFactory;
import io.crate.operation.collect.files.FileReadingCollector;
Expand Down Expand Up @@ -376,30 +378,23 @@ public void run() {
}
});
} else {
int availableThreads = Math.max(poolSize - executor.getActiveCount(), 2);
if (availableThreads < shardCollectors.size()) {
Iterable<List<CrateCollector>> partition = Iterables.partition(
shardCollectors, shardCollectors.size() / availableThreads);
for (final List<CrateCollector> collectors : partition) {
executor.execute(new Runnable() {
@Override
public void run() {
for (CrateCollector collector : collectors) {
doCollect(result, collector, ramAccountingContext);
}
}
});
}
} else {
for (final CrateCollector shardCollector : shardCollectors) {
executor.execute(new Runnable() {
ThreadPools.runWithAvailableThreads(
executor,
poolSize,
Lists.transform(shardCollectors, new Function<CrateCollector, Runnable>() {

@Nullable
@Override
public void run() {
doCollect(result, shardCollector, ramAccountingContext);
public Runnable apply(final CrateCollector input) {
return new Runnable() {
@Override
public void run() {
doCollect(result, input, ramAccountingContext);
}
};
}
});
}
}
})
);
}
}

Expand Down
43 changes: 20 additions & 23 deletions sql/src/main/java/io/crate/operation/fetch/NodeFetchOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.LongCursor;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import io.crate.breaker.RamAccountingContext;
import io.crate.core.collections.Bucket;
import io.crate.metadata.Functions;
import io.crate.operation.Input;
import io.crate.operation.RowDownstream;
import io.crate.operation.ThreadPools;
import io.crate.operation.collect.*;
import io.crate.operation.projectors.Projector;
import io.crate.operation.reference.DocLevelReferenceResolver;
Expand All @@ -44,6 +46,7 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.threadpool.ThreadPool;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -168,30 +171,24 @@ public ListenableFuture<Bucket> fetch() throws Exception {
private void runFetchThreaded(final ShardCollectFuture result,
final List<LuceneDocFetcher> shardFetchers,
final RamAccountingContext ramAccountingContext) throws RejectedExecutionException {
int availableThreads = Math.max(poolSize - executor.getActiveCount(), 2);
if (availableThreads < shardFetchers.size()) {
Iterable<List<LuceneDocFetcher>> partition = Iterables.partition(
shardFetchers, shardFetchers.size() / availableThreads);
for (final List<LuceneDocFetcher> fetchers : partition) {
executor.execute(new Runnable() {
@Override
public void run() {
for (LuceneDocFetcher fetcher : fetchers) {
doFetch(result, fetcher, ramAccountingContext);
}
}
});
}
} else {
for (final LuceneDocFetcher fetcher : shardFetchers) {
executor.execute(new Runnable() {

ThreadPools.runWithAvailableThreads(
executor,
poolSize,
Lists.transform(shardFetchers, new Function<LuceneDocFetcher, Runnable>() {

@Nullable
@Override
public void run() {
doFetch(result, fetcher, ramAccountingContext);
public Runnable apply(final LuceneDocFetcher input) {
return new Runnable() {
@Override
public void run() {
doFetch(result, input, ramAccountingContext);
}
};
}
});
}
}
})
);
}


Expand Down

0 comments on commit 5acb3e8

Please sign in to comment.