Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pause support to the fetchProjector #3775

Merged
merged 1 commit into from
Jul 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public ExecutorResumeHandle(Executor executor, Runnable runnable) {

@Override
public void resume(boolean async) {
resume(executor, runnable, async);
}

public static void resume(Executor executor, Runnable runnable, boolean async) {
if (async) {
try {
executor.execute(runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@
import io.crate.operation.Input;
import io.crate.operation.InputRow;
import io.crate.operation.fetch.FetchRowInputSymbolVisitor;
import io.crate.operation.projectors.AbstractProjector;
import io.crate.operation.projectors.RepeatHandle;
import io.crate.operation.projectors.Requirement;
import io.crate.operation.projectors.ResumeHandle;
import io.crate.operation.projectors.*;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -209,7 +206,7 @@ public void onFailure(Throwable t) {

@Override
protected void doRun() throws Exception {
sendToDownstream(isLast);
sendToDownstream(isLast, 0);
if (isLast) {
finishDownstream();
}
Expand All @@ -228,7 +225,7 @@ public void onFailure(@Nonnull Throwable t) {
}
}
if (!anyRequestSent) {
sendToDownstream(isLast);
sendToDownstream(isLast, 0);
if (isLast) {
finishDownstream();
}
Expand All @@ -246,8 +243,8 @@ private IntObjectHashMap<IntContainer> generateToFetch(Map.Entry<String, IntSet>
return toFetch;
}

private void sendToDownstream(boolean isLast) {
if (nextStage(Stage.FETCH, Stage.EMIT)) {
private void sendToDownstream(final boolean isLast, final int rowStartIdx) {
if (rowStartIdx == 0 && nextStage(Stage.FETCH, Stage.EMIT)) {
return;
}
final ArrayBackedRow inputRow = collectRowContext.inputRow();
Expand All @@ -256,24 +253,37 @@ private void sendToDownstream(boolean isLast) {
final int[] docIdPositions = collectRowContext.docIdPositions();

loop:
for (Object[] cells : inputValues) {
for (int i = rowStartIdx; i < inputValues.size(); i++) {
Object[] cells = inputValues.get(i);
inputRow.cells = cells;
for (int i = 0; i < docIdPositions.length; i++) {
long doc = (long) cells[docIdPositions[i]];
for (int j = 0; j < docIdPositions.length; j++) {
long doc = (long) cells[docIdPositions[j]];
int readerId = (int) (doc >> 32);
int docId = (int) (long) doc;
ReaderBucket readerBucket = context.getReaderBucket(readerId);
assert readerBucket != null;
setPartitionRow(partitionRows, i, readerBucket);
fetchRows[i].cells = readerBucket.get(docId);
assert !readerBucket.fetchRequired() || fetchRows[i].cells != null;
setPartitionRow(partitionRows, j, readerBucket);
fetchRows[j].cells = readerBucket.get(docId);
assert !readerBucket.fetchRequired() || fetchRows[j].cells != null;
}
Result result = downstream.setNextRow(outputRow);
switch (result) {
case CONTINUE:
continue;
case PAUSE:
throw new UnsupportedOperationException("FetchProjector doesn't support pause");
final int startIdx = i + 1;
downstream.pauseProcessed(new ResumeHandle() {
@Override
public void resume(boolean async) {
ExecutorResumeHandle.resume(resultExecutor, new Runnable() {
@Override
public void run() {
sendToDownstream(isLast, startIdx);
}
}, async);
}
});
return;
case STOP:
break loop;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.crate.analyze.symbol.FetchReference;
import io.crate.analyze.symbol.InputColumn;
Expand All @@ -45,6 +46,7 @@
import io.crate.planner.node.fetch.FetchSource;
import io.crate.test.integration.CrateUnitTest;
import io.crate.testing.CollectingRowReceiver;
import io.crate.testing.RowSender;
import io.crate.testing.TestingHelpers;
import io.crate.types.LongType;
import org.junit.After;
Expand All @@ -62,24 +64,13 @@ public class FetchProjectorTest extends CrateUnitTest {

private static final TableIdent USER_TABLE_IDENT = new TableIdent(Schemas.DEFAULT_SCHEMA_NAME, "users");
private ExecutorService executorService;
private FetchOperation fetchOperation;

@Before
public void before() throws Exception {
executorService = Executors.newFixedThreadPool(2);
}

@After
public void after() throws Exception {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}

@Test
public void testMultipleFetchRequests() throws Throwable {
CollectingRowReceiver rowReceiver = new CollectingRowReceiver();

// dummy FetchOperation that returns buckets for each reader-id where each row contains a column that is the same as the docId
FetchOperation fetchOperation = new FetchOperation() {
fetchOperation = new FetchOperation() {
@Override
public ListenableFuture<IntObjectMap<? extends Bucket>> fetch(String nodeId, IntObjectMap<? extends IntContainer> toFetch, boolean closeContext) {
IntObjectHashMap<Bucket> readerToBuckets = new IntObjectHashMap<>();
Expand All @@ -93,6 +84,38 @@ public ListenableFuture<IntObjectMap<? extends Bucket>> fetch(String nodeId, Int
return Futures.<IntObjectMap<? extends Bucket>>immediateFuture(readerToBuckets);
}
};
}

@After
public void after() throws Exception {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}

@Test
public void testPauseSupport() throws Exception {
final CollectingRowReceiver rowReceiver = CollectingRowReceiver.withPauseAfter(2);
int fetchSize = 4;
FetchProjector fetchProjector = prepareFetchProjector(fetchSize, rowReceiver, fetchOperation);
final RowSender rowSender = new RowSender(RowSender.rowRange(0, 10), fetchProjector, MoreExecutors.directExecutor());
rowSender.run();

assertBusy(new Runnable() {
@Override
public void run() {
assertThat(rowSender.numPauses(), is(1));
assertThat(rowReceiver.rows.size(), is(2));
}
});
rowReceiver.resumeUpstream(false);
assertThat(TestingHelpers.printedTable(rowReceiver.result()),
is("0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n"));
}

@Test
public void testMultipleFetchRequests() throws Throwable {
CollectingRowReceiver rowReceiver = new CollectingRowReceiver();


int fetchSize = 3;
FetchProjector fetchProjector = prepareFetchProjector(fetchSize, rowReceiver, fetchOperation);
Expand Down