diff --git a/sql/src/main/java/io/crate/operation/projectors/fetch/FetchProjector.java b/sql/src/main/java/io/crate/operation/projectors/fetch/FetchProjector.java index 44905c8e2c09..c3145cb2bcf4 100644 --- a/sql/src/main/java/io/crate/operation/projectors/fetch/FetchProjector.java +++ b/sql/src/main/java/io/crate/operation/projectors/fetch/FetchProjector.java @@ -209,7 +209,7 @@ public void onFailure(Throwable t) { @Override protected void doRun() throws Exception { - sendToDownstream(isLast); + sendToDownstream(isLast, 0); if (isLast) { finishDownstream(); } @@ -228,7 +228,7 @@ public void onFailure(@Nonnull Throwable t) { } } if (!anyRequestSent) { - sendToDownstream(isLast); + sendToDownstream(isLast, 0); if (isLast) { finishDownstream(); } @@ -246,8 +246,8 @@ private IntObjectHashMap generateToFetch(Map.Entry return toFetch; } - private void sendToDownstream(boolean isLast) { - if (nextStage(Stage.FETCH, Stage.EMIT)) { + private void sendToDownstream(final boolean isLast, final int rowStartIdx) { + if (rowStartIdx == 0 && nextStage(Stage.FETCH, Stage.EMIT)) { return; } final ArrayBackedRow inputRow = collectRowContext.inputRow(); @@ -256,24 +256,32 @@ private void sendToDownstream(boolean isLast) { final int[] docIdPositions = collectRowContext.docIdPositions(); loop: - for (Object[] cells : inputValues) { + for (int i = rowStartIdx; i < inputValues.size(); i++) { + Object[] cells = inputValues.get(i); inputRow.cells = cells; - for (int i = 0; i < docIdPositions.length; i++) { - long doc = (long) cells[docIdPositions[i]]; + for (int j = 0; j < docIdPositions.length; j++) { + long doc = (long) cells[docIdPositions[j]]; int readerId = (int) (doc >> 32); int docId = (int) (long) doc; ReaderBucket readerBucket = context.getReaderBucket(readerId); assert readerBucket != null; - setPartitionRow(partitionRows, i, readerBucket); - fetchRows[i].cells = readerBucket.get(docId); - assert !readerBucket.fetchRequired() || fetchRows[i].cells != null; + setPartitionRow(partitionRows, j, readerBucket); + fetchRows[j].cells = readerBucket.get(docId); + assert !readerBucket.fetchRequired() || fetchRows[j].cells != null; } Result result = downstream.setNextRow(outputRow); switch (result) { case CONTINUE: continue; case PAUSE: - throw new UnsupportedOperationException("FetchProjector doesn't support pause"); + final int startIdx = i + 1; + downstream.pauseProcessed(new ResumeHandle() { + @Override + public void resume(boolean async) { + sendToDownstream(isLast, startIdx); + } + }); + return; case STOP: break loop; } diff --git a/sql/src/test/java/io/crate/operation/projectors/FetchProjectorTest.java b/sql/src/test/java/io/crate/operation/projectors/FetchProjectorTest.java index c02843d4ba44..e1cfe550a2ad 100644 --- a/sql/src/test/java/io/crate/operation/projectors/FetchProjectorTest.java +++ b/sql/src/test/java/io/crate/operation/projectors/FetchProjectorTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.crate.analyze.symbol.FetchReference; import io.crate.analyze.symbol.InputColumn; @@ -45,6 +46,7 @@ import io.crate.planner.node.fetch.FetchSource; import io.crate.test.integration.CrateUnitTest; import io.crate.testing.CollectingRowReceiver; +import io.crate.testing.RowSender; import io.crate.testing.TestingHelpers; import io.crate.types.LongType; import org.junit.After; @@ -62,24 +64,13 @@ public class FetchProjectorTest extends CrateUnitTest { private static final TableIdent USER_TABLE_IDENT = new TableIdent(Schemas.DEFAULT_SCHEMA_NAME, "users"); private ExecutorService executorService; + private FetchOperation fetchOperation; @Before public void before() throws Exception { executorService = Executors.newFixedThreadPool(2); - } - - @After - public void after() throws Exception { - executorService.shutdown(); - executorService.awaitTermination(1, TimeUnit.SECONDS); - } - - @Test - public void testMultipleFetchRequests() throws Throwable { - CollectingRowReceiver rowReceiver = new CollectingRowReceiver(); - // dummy FetchOperation that returns buckets for each reader-id where each row contains a column that is the same as the docId - FetchOperation fetchOperation = new FetchOperation() { + fetchOperation = new FetchOperation() { @Override public ListenableFuture> fetch(String nodeId, IntObjectMap toFetch, boolean closeContext) { IntObjectHashMap readerToBuckets = new IntObjectHashMap<>(); @@ -93,6 +84,38 @@ public ListenableFuture> fetch(String nodeId, Int return Futures.>immediateFuture(readerToBuckets); } }; + } + + @After + public void after() throws Exception { + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.SECONDS); + } + + @Test + public void testPauseSupport() throws Exception { + final CollectingRowReceiver rowReceiver = CollectingRowReceiver.withPauseAfter(2); + int fetchSize = 4; + FetchProjector fetchProjector = prepareFetchProjector(fetchSize, rowReceiver, fetchOperation); + final RowSender rowSender = new RowSender(RowSender.rowRange(0, 10), fetchProjector, MoreExecutors.directExecutor()); + rowSender.run(); + + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(rowSender.numPauses(), is(1)); + assertThat(rowReceiver.rows.size(), is(2)); + } + }); + rowReceiver.resumeUpstream(false); + assertThat(TestingHelpers.printedTable(rowReceiver.result()), + is("0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n")); + } + + @Test + public void testMultipleFetchRequests() throws Throwable { + CollectingRowReceiver rowReceiver = new CollectingRowReceiver(); + int fetchSize = 3; FetchProjector fetchProjector = prepareFetchProjector(fetchSize, rowReceiver, fetchOperation);