Skip to content

Commit

Permalink
don't create new threads in OutputS3
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Apr 20, 2015
1 parent 8fe285c commit 4fb2ab0
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public TransportExecutor(Settings settings,
referenceResolver, functions, RowGranularity.CLUSTER);
this.globalProjectionToProjectionVisitor = new ProjectionToProjectorVisitor(
clusterService,
threadPool,
settings,
transportActionProvider,
bulkRetryCoordinatorPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.Executor;

Expand All @@ -49,6 +50,7 @@ public class PageDownstreamFactory {

@Inject
public PageDownstreamFactory(ClusterService clusterService,
ThreadPool threadPool,
Settings settings,
TransportActionProvider transportActionProvider,
BulkRetryCoordinatorPool bulkRetryCoordinatorPool,
Expand All @@ -61,6 +63,7 @@ public PageDownstreamFactory(ClusterService clusterService,
);
this.projectionToProjectorVisitor = new ProjectionToProjectorVisitor(
clusterService,
threadPool,
settings,
transportActionProvider,
bulkRetryCoordinatorPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Set;
Expand All @@ -54,6 +55,7 @@ public class HandlerSideDataCollectOperation implements CollectOperation {

@Inject
public HandlerSideDataCollectOperation(ClusterService clusterService,
ThreadPool threadPool,
Settings settings,
TransportActionProvider transportActionProvider,
BulkRetryCoordinatorPool bulkRetryCoordinatorPool,
Expand All @@ -67,6 +69,7 @@ public HandlerSideDataCollectOperation(ClusterService clusterService,
this.implementationVisitor = new ImplementationSymbolVisitor(referenceResolver, functions, RowGranularity.CLUSTER);
this.projectorVisitor = new ProjectionToProjectorVisitor(
clusterService,
threadPool,
settings,
transportActionProvider,
bulkRetryCoordinatorPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public MapSideDataCollectOperation(ClusterService clusterService,
new FileCollectInputSymbolVisitor(functions, FileLineReferenceResolver.INSTANCE);
this.projectorVisitor = new ProjectionToProjectorVisitor(
clusterService,
threadPool,
settings,
transportActionProvider,
bulkRetryCoordinatorPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public ShardCollectService(ThreadPool threadPool,
);
this.projectorVisitor = new ProjectionToProjectorVisitor(
clusterService,
threadPool,
settings,
transportActionProvider,
bulkRetryCoordinatorPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;

import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;

public class ProjectionToProjectorVisitor extends ProjectionVisitor<ProjectionToProjectorVisitor.Context, Projector> {

private final ClusterService clusterService;
private ThreadPool threadPool;
private final Settings settings;
private final TransportActionProvider transportActionProvider;
private final BulkRetryCoordinatorPool bulkRetryCoordinatorPool;
Expand All @@ -55,13 +58,15 @@ public class ProjectionToProjectorVisitor extends ProjectionVisitor<ProjectionTo
@Nullable

public ProjectionToProjectorVisitor(ClusterService clusterService,
ThreadPool threadPool,
Settings settings,
TransportActionProvider transportActionProvider,
BulkRetryCoordinatorPool bulkRetryCoordinatorPool,
ImplementationSymbolVisitor symbolVisitor,
EvaluatingNormalizer normalizer,
@Nullable ShardId shardId) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.settings = settings;
this.transportActionProvider = transportActionProvider;
this.bulkRetryCoordinatorPool = bulkRetryCoordinatorPool;
Expand All @@ -71,20 +76,22 @@ public ProjectionToProjectorVisitor(ClusterService clusterService,
}

public ProjectionToProjectorVisitor(ClusterService clusterService,
ThreadPool threadPool,
Settings settings,
TransportActionProvider transportActionProvider,
BulkRetryCoordinatorPool bulkRetryCoordinatorPool,
ImplementationSymbolVisitor symbolVisitor,
EvaluatingNormalizer normalizer) {
this(clusterService, settings, transportActionProvider, bulkRetryCoordinatorPool, symbolVisitor, normalizer, null);
this(clusterService, threadPool, settings, transportActionProvider, bulkRetryCoordinatorPool, symbolVisitor, normalizer, null);
}

public ProjectionToProjectorVisitor(ClusterService clusterService,
ThreadPool threadPool,
Settings settings,
TransportActionProvider transportActionProvider,
BulkRetryCoordinatorPool bulkRetryCoordinatorPool,
ImplementationSymbolVisitor symbolVisitor) {
this(clusterService, settings, transportActionProvider, bulkRetryCoordinatorPool, symbolVisitor,
this(clusterService, threadPool, settings, transportActionProvider, bulkRetryCoordinatorPool, symbolVisitor,
new EvaluatingNormalizer(
symbolVisitor.functions(),
symbolVisitor.rowGranularity(),
Expand Down Expand Up @@ -224,6 +231,7 @@ public Projector visitWriterProjection(WriterProjection projection, Context cont
uri = sb.toString();
}
return new WriterProjector(
((ThreadPoolExecutor) threadPool.generic()),
uri,
projection.settings(),
inputs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -76,7 +78,8 @@ public class WriterProjector implements Projector, RowDownstreamHandle {
*
* If inputs is not null the inputs are consumed to write a JSON array to the output.
*/
public WriterProjector(String uri,
public WriterProjector(ExecutorService executorService,
String uri,
Settings settings,
@Nullable List<Input<?>> inputs,
Set<CollectExpression<?>> collectExpressions,
Expand All @@ -92,7 +95,7 @@ public WriterProjector(String uri,
if (this.uri.getScheme() == null || this.uri.getScheme().equals("file")) {
this.output = new OutputFile(this.uri, settings);
} else if (this.uri.getScheme().equalsIgnoreCase("s3")) {
this.output = new OutputS3(this.uri, settings);
this.output = new OutputS3(executorService, this.uri, settings);
} else {
throw new UnsupportedFeatureException(String.format("Unknown scheme '%s'", this.uri.getScheme()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,28 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.zip.GZIPOutputStream;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;

@NotThreadSafe
public class OutputS3 extends Output {

private final ExecutorService executorService;
private final URI uri;
private final boolean compression;
private OutputStream outputStream;

public OutputS3(URI uri, Settings settings) {
public OutputS3(ExecutorService executorService, URI uri, Settings settings) {
this.executorService = executorService;
this.uri = uri;
compression = parseCompression(settings);
}

@Override
public void open() throws IOException {
outputStream = new S3OutputStream(uri, new S3ClientHelper());
outputStream = new S3OutputStream(executorService, uri, new S3ClientHelper());
if (compression) {
outputStream = new GZIPOutputStream(outputStream);
}
Expand Down Expand Up @@ -96,13 +97,12 @@ private static class S3OutputStream extends OutputStream {
long bytesWritten = 0;
int partNumber = 1;

private S3OutputStream(URI uri, S3ClientHelper s3ClientHelper) throws IOException {
private S3OutputStream(ExecutorService executor, URI uri, S3ClientHelper s3ClientHelper) throws IOException {
bucketName = uri.getHost();
key = uri.getPath().substring(1);
outputStream = new ByteArrayOutputStream();
client = s3ClientHelper.client(uri);
executorService = MoreExecutors.listeningDecorator(
Executors.newCachedThreadPool(daemonThreadFactory("OutputS3")));
executorService = MoreExecutors.listeningDecorator(executor);
multipartUpload = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(bucketName, key));
}
Expand Down
13 changes: 11 additions & 2 deletions sql/src/test/java/io/crate/executor/task/LocalMergeTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,17 @@
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Answers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.*;
import java.util.concurrent.TimeUnit;

import static io.crate.testing.TestingHelpers.isRow;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -111,6 +114,7 @@ protected void configure() {
groupProjection.values(Arrays.asList(
new Aggregation(minAggFunction.info(), Arrays.<Symbol>asList(new InputColumn(1)), Aggregation.Step.PARTIAL, Aggregation.Step.FINAL)
));
threadPool = new ThreadPool("testing");
pageDownstreamFactory = mock(PageDownstreamFactory.class);
when(pageDownstreamFactory.createMergeNodePageDownstream(any(MergeNode.class), any(ResultProvider.class), any(RamAccountingContext.class), any(Optional.class))).thenAnswer(new Answer<PageDownstream>() {
@Override
Expand All @@ -119,6 +123,7 @@ public PageDownstream answer(InvocationOnMock invocation) throws Throwable {
MergeNode mergeNode = (MergeNode) invocation.getArguments()[0];
ProjectionToProjectorVisitor projectionToProjectorVisitor = new ProjectionToProjectorVisitor(
mock(ClusterService.class),
threadPool,
ImmutableSettings.EMPTY,
mock(TransportActionProvider.class, Answers.RETURNS_DEEP_STUBS.get()),
mock(BulkRetryCoordinatorPool.class),
Expand All @@ -132,8 +137,12 @@ public PageDownstream answer(InvocationOnMock invocation) throws Throwable {
return nonSortingBucketMerger;
}
});
threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(MoreExecutors.sameThreadExecutor());
}

@After
public void after() throws Exception {
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.SECONDS);
}

private ListenableFuture<TaskResult> getUpstreamResult(int numRows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Answers;
Expand All @@ -63,6 +65,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static io.crate.testing.TestingHelpers.isRow;
import static org.hamcrest.Matchers.contains;
Expand All @@ -76,6 +79,7 @@ public class PageDownstreamFactoryTest extends CrateUnitTest {
private GroupProjection groupProjection;
private Functions functions;
private ReferenceResolver referenceResolver;
private ThreadPool threadPool;

@Before
@SuppressWarnings("unchecked")
Expand All @@ -89,6 +93,7 @@ protected void configure() {
}
})
.createInjector();
threadPool = new ThreadPool("testing");
functions = injector.getInstance(Functions.class);
referenceResolver = new GlobalReferenceResolver(
Collections.<ReferenceIdent, ReferenceImplementation>emptyMap());
Expand All @@ -104,6 +109,14 @@ protected void configure() {
));
}

@Override
@After
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.SECONDS);
}

@Test
public void testMergeSingleResult() throws Exception {
TopNProjection topNProjection = new TopNProjection(3, TopN.NO_OFFSET,
Expand All @@ -124,6 +137,7 @@ public void testMergeSingleResult() throws Exception {
BucketPage page = new BucketPage(Futures.immediateFuture(rows));
final PageDownstreamFactory pageDownstreamFactory = new PageDownstreamFactory(
mock(ClusterService.class),
threadPool,
ImmutableSettings.EMPTY,
mock(TransportActionProvider.class, Answers.RETURNS_DEEP_STUBS.get()),
mock(BulkRetryCoordinatorPool.class),
Expand Down Expand Up @@ -162,6 +176,7 @@ public void testMergeMultipleResults() throws Exception {
));
final PageDownstreamFactory pageDownstreamFactory = new PageDownstreamFactory(
mock(ClusterService.class),
threadPool,
ImmutableSettings.EMPTY,
mock(TransportActionProvider.class, Answers.RETURNS_DEEP_STUBS.get()),
mock(BulkRetryCoordinatorPool.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.highlight.HighlightModule;
import org.elasticsearch.test.cluster.NoopClusterService;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
Expand All @@ -74,6 +77,7 @@ public class ShardProjectorChainTest extends CrateUnitTest {
new RamAccountingContext("dummy", new NoopCircuitBreaker(CircuitBreaker.Name.FIELDDATA));

private ProjectionToProjectorVisitor projectionToProjectorVisitor;
private ThreadPool threadPool;

private class TestModule extends AbstractModule {

Expand Down Expand Up @@ -102,13 +106,15 @@ public void prepare() {
new TestModule(),
new MetaDataModule());
Injector injector = builder.createInjector();
threadPool = new ThreadPool("testing");

ImplementationSymbolVisitor implementationSymbolVisitor = new ImplementationSymbolVisitor(
injector.getInstance(ReferenceResolver.class),
injector.getInstance(Functions.class),
RowGranularity.CLUSTER);
projectionToProjectorVisitor = new ProjectionToProjectorVisitor(
new NoopClusterService(),
threadPool,
ImmutableSettings.EMPTY,
mock(TransportActionProvider.class),
mock(BulkRetryCoordinatorPool.class),
Expand All @@ -117,6 +123,12 @@ public void prepare() {
);
}

@After
public void after() throws Exception {
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.SECONDS);
}

private Aggregation countAggregation() {
return new Aggregation(
CountAggregation.COUNT_STAR_FUNCTION,
Expand Down
Loading

0 comments on commit 4fb2ab0

Please sign in to comment.