Skip to content

Commit

Permalink
fix: pageDownstreamContext must close RamAccountingContext on close o…
Browse files Browse the repository at this point in the history
…r kill
  • Loading branch information
seut committed Jun 3, 2015
1 parent e2e41ce commit 915e642
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Changes for Crate
Unreleased
==========

- Fix: Accounted RAM wasn't freed after distributed grouping queries finished.

- Added missing cluster settings for request circuit breaker
and expose them via information schema

Expand Down
3 changes: 2 additions & 1 deletion sql/src/main/java/io/crate/action/job/ContextPreparer.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public Void visitMergeNode(final MergeNode node, final PreparerContext context)
);
StreamerVisitor.Context streamerContext = streamerVisitor.processPlanNode(node);
PageDownstreamContext pageDownstreamContext = new PageDownstreamContext(
pageDownstream, streamerContext.inputStreamers(), node.numUpstreams());
pageDownstream, streamerContext.inputStreamers(),
context.ramAccountingContext, node.numUpstreams());

statsTables.operationStarted(node.executionNodeId(), context.jobId, node.name());
Futures.addCallback(downstream.result(), new OperationFinishedStatsTablesCallback<Bucket>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private PageDownstreamContext createPageDownstreamContext(RamAccountingContext r
return new PageDownstreamContext(
finalMergePageDownstream,
streamers,
ramAccountingContext,
executionNodes[executionNodes.length - 1].executionNodes().size()
);
}
Expand Down
5 changes: 5 additions & 0 deletions sql/src/main/java/io/crate/jobs/PageDownstreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.google.common.util.concurrent.SettableFuture;
import io.crate.Streamer;
import io.crate.breaker.RamAccountingContext;
import io.crate.core.collections.Bucket;
import io.crate.core.collections.BucketPage;
import io.crate.operation.PageConsumeListener;
Expand All @@ -42,6 +43,7 @@ public class PageDownstreamContext implements ExecutionSubContext {
private final Object lock = new Object();
private final PageDownstream pageDownstream;
private final Streamer<?>[] streamer;
private final RamAccountingContext ramAccountingContext;
private final int numBuckets;
private final ArrayList<SettableFuture<Bucket>> bucketFutures;
private final BitSet allFuturesSet;
Expand All @@ -53,9 +55,11 @@ public class PageDownstreamContext implements ExecutionSubContext {

public PageDownstreamContext(PageDownstream pageDownstream,
Streamer<?>[] streamer,
RamAccountingContext ramAccountingContext,
int numBuckets) {
this.pageDownstream = pageDownstream;
this.streamer = streamer;
this.ramAccountingContext = ramAccountingContext;
this.numBuckets = numBuckets;
bucketFutures = new ArrayList<>(numBuckets);
allFuturesSet = new BitSet(numBuckets);
Expand Down Expand Up @@ -162,6 +166,7 @@ public void finish() {
contextCallback.onClose();
}
pageDownstream.finish();
ramAccountingContext.close();
} else {
LOGGER.warn("called finish on an already closed PageDownstreamContext");
}
Expand Down
12 changes: 9 additions & 3 deletions sql/src/test/java/io/crate/jobs/JobContextServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.crate.Streamer;
import io.crate.breaker.RamAccountingContext;
import io.crate.operation.PageDownstream;
import io.crate.test.integration.CrateUnitTest;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -47,6 +50,9 @@

public class JobContextServiceTest extends CrateUnitTest {

private static final RamAccountingContext RAM_ACCOUNTING_CONTEXT =
new RamAccountingContext("dummy", new NoopCircuitBreaker(CircuitBreaker.Name.FIELDDATA));

private final ThreadPool testThreadPool = new ThreadPool(getClass().getSimpleName());
private final Settings settings = ImmutableSettings.EMPTY;
private final JobContextService jobContextService = new JobContextService(
Expand Down Expand Up @@ -95,7 +101,7 @@ public void testAccessContext() throws Exception {
public void testJobExecutionContextIsSelfClosing() throws Exception {
JobExecutionContext.Builder builder1 = jobContextService.newBuilder(UUID.randomUUID());
PageDownstreamContext pageDownstreamContext =
new PageDownstreamContext(mock(PageDownstream.class), new Streamer[0], 1);
new PageDownstreamContext(mock(PageDownstream.class), new Streamer[0], RAM_ACCOUNTING_CONTEXT, 1);
builder1.addSubContext(1, pageDownstreamContext);
JobExecutionContext ctx1 = jobContextService.createOrMergeContext(builder1);

Expand Down Expand Up @@ -124,7 +130,7 @@ public void testCloseContext() throws Exception {
private JobExecutionContext getJobExecutionContextWithOneActiveSubContext(JobContextService jobContextService) {
JobExecutionContext.Builder builder1 = jobContextService.newBuilder(UUID.randomUUID());
PageDownstreamContext pageDownstreamContext =
new PageDownstreamContext(mock(PageDownstream.class), new Streamer[0], 1);
new PageDownstreamContext(mock(PageDownstream.class), new Streamer[0], RAM_ACCOUNTING_CONTEXT, 1);
builder1.addSubContext(1, pageDownstreamContext);
return jobContextService.createOrMergeContext(builder1);
}
Expand Down Expand Up @@ -160,7 +166,7 @@ public void testCreateOrMergeThreaded() throws Exception {
for (int i = 0; i < 50; i++) {
final int currentSubContextId = i;
final PageDownstreamContext pageDownstreamContext =
new PageDownstreamContext(mock(PageDownstream.class), new Streamer[0], 1);
new PageDownstreamContext(mock(PageDownstream.class), new Streamer[0], RAM_ACCOUNTING_CONTEXT, 1);
ListenableFuture<?> future = executorService.submit(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
package io.crate.jobs;

import io.crate.Streamer;
import io.crate.breaker.RamAccountingContext;
import io.crate.core.collections.Row1;
import io.crate.core.collections.SingleRowBucket;
import io.crate.operation.PageDownstream;
import io.crate.operation.PageResultListener;
import io.crate.test.integration.CrateUnitTest;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand All @@ -41,6 +44,9 @@

public class PageDownstreamContextTest extends CrateUnitTest {

private static final RamAccountingContext RAM_ACCOUNTING_CONTEXT =
new RamAccountingContext("dummy", new NoopCircuitBreaker(CircuitBreaker.Name.FIELDDATA));

@Test
public void testCantSetSameBucketTwiceWithoutReceivingFullPage() throws Exception {
final AtomicReference<Throwable> ref = new AtomicReference<>();
Expand All @@ -54,7 +60,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}
}).when(pageDownstream).fail((Throwable)notNull());

PageDownstreamContext ctx = new PageDownstreamContext(pageDownstream, new Streamer[0], 3);
PageDownstreamContext ctx = new PageDownstreamContext(pageDownstream, new Streamer[0], RAM_ACCOUNTING_CONTEXT, 3);

PageResultListener pageResultListener = mock(PageResultListener.class);
ctx.setBucket(1, new SingleRowBucket(new Row1("foo")), false, pageResultListener);
Expand Down

0 comments on commit 915e642

Please sign in to comment.