Skip to content

Commit

Permalink
added possibility to kill CountContext
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Bogensberger committed May 7, 2015
1 parent 7af3f38 commit 63781db
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
11 changes: 10 additions & 1 deletion sql/src/main/java/io/crate/jobs/CountContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class CountContext implements RowUpstream, ExecutionSubContext {
private final ArrayList<ContextCallback> callbacks = new ArrayList<>(1);
private final AtomicBoolean closed = new AtomicBoolean(false);

private ListenableFuture<Long> countFuture;

public CountContext(CountOperation countOperation,
RowDownstream rowDownstream,
Map<String, List<Integer>> indexShardMap,
Expand All @@ -60,7 +62,7 @@ public CountContext(CountOperation countOperation,

public void start() {
try {
ListenableFuture<Long> countFuture = countOperation.count(indexShardMap, whereClause);
countFuture = countOperation.count(indexShardMap, whereClause);
Futures.addCallback(countFuture, new FutureCallback<Long>() {
@Override
public void onSuccess(@Nullable Long result) {
Expand Down Expand Up @@ -94,4 +96,11 @@ public void close() {
}
}
}

public void kill() {
if (countFuture != null) {
countFuture.cancel(true);
}
close();
}
}
41 changes: 41 additions & 0 deletions sql/src/test/java/io/crate/jobs/CountContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package io.crate.jobs;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.crate.analyze.WhereClause;
import io.crate.exceptions.UnknownUpstreamFailure;
Expand All @@ -30,6 +31,9 @@
import io.crate.operation.count.CountOperation;
import io.crate.test.integration.CrateUnitTest;
import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
Expand Down Expand Up @@ -62,4 +66,41 @@ public void testClose() throws Exception {
future.setException(new UnknownUpstreamFailure());
verify(callback, times(1)).onClose();
}

@Test
public void testKillOperationFuture() throws Exception {
ListenableFuture<Long> future = mock(ListenableFuture.class);
CountOperation countOperation = new FakeCountOperation(future);

RowDownstream rowDownstream = mock(RowDownstream.class);
when(rowDownstream.registerUpstream(any(RowUpstream.class))).thenReturn(mock(RowDownstreamHandle.class));
CountContext countContext = new CountContext(countOperation, rowDownstream, null, WhereClause.MATCH_ALL);

ContextCallback callback = mock(ContextCallback.class);
countContext.addCallback(callback);
countContext.start();
countContext.kill();

verify(future, times(1)).cancel(true);
verify(callback, times(1)).onClose();
}

private static class FakeCountOperation implements CountOperation {

private final ListenableFuture<Long> future;

public FakeCountOperation(ListenableFuture<Long> future) {
this.future = future;
}

@Override
public ListenableFuture<Long> count(Map<String, ? extends Collection<Integer>> indexShardMap, WhereClause whereClause) throws IOException, InterruptedException {
return future;
}

@Override
public long count(String index, int shardId, WhereClause whereClause) throws IOException, InterruptedException {
return 0;
}
}
}

0 comments on commit 63781db

Please sign in to comment.