Skip to content

Commit

Permalink
Merge 9db22a5 into 058713e
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Jul 11, 2016
2 parents 058713e + 9db22a5 commit 165005c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public ExecutorResumeHandle(Executor executor, Runnable runnable) {

@Override
public void resume(boolean async) {
resume(executor, runnable, async);
}

public static void resume(Executor executor, Runnable runnable, boolean async) {
if (async) {
try {
executor.execute(runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@
import io.crate.operation.Input;
import io.crate.operation.InputRow;
import io.crate.operation.fetch.FetchRowInputSymbolVisitor;
import io.crate.operation.projectors.AbstractProjector;
import io.crate.operation.projectors.RepeatHandle;
import io.crate.operation.projectors.Requirement;
import io.crate.operation.projectors.ResumeHandle;
import io.crate.operation.projectors.*;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -209,7 +206,7 @@ public void onFailure(Throwable t) {

@Override
protected void doRun() throws Exception {
sendToDownstream(isLast);
sendToDownstream(isLast, 0);
if (isLast) {
finishDownstream();
}
Expand All @@ -228,7 +225,7 @@ public void onFailure(@Nonnull Throwable t) {
}
}
if (!anyRequestSent) {
sendToDownstream(isLast);
sendToDownstream(isLast, 0);
if (isLast) {
finishDownstream();
}
Expand All @@ -246,8 +243,8 @@ private IntObjectHashMap<IntContainer> generateToFetch(Map.Entry<String, IntSet>
return toFetch;
}

private void sendToDownstream(boolean isLast) {
if (nextStage(Stage.FETCH, Stage.EMIT)) {
private void sendToDownstream(final boolean isLast, final int rowStartIdx) {
if (rowStartIdx == 0 && nextStage(Stage.FETCH, Stage.EMIT)) {
return;
}
final ArrayBackedRow inputRow = collectRowContext.inputRow();
Expand All @@ -256,24 +253,37 @@ private void sendToDownstream(boolean isLast) {
final int[] docIdPositions = collectRowContext.docIdPositions();

loop:
for (Object[] cells : inputValues) {
for (int i = rowStartIdx; i < inputValues.size(); i++) {
Object[] cells = inputValues.get(i);
inputRow.cells = cells;
for (int i = 0; i < docIdPositions.length; i++) {
long doc = (long) cells[docIdPositions[i]];
for (int j = 0; j < docIdPositions.length; j++) {
long doc = (long) cells[docIdPositions[j]];
int readerId = (int) (doc >> 32);
int docId = (int) (long) doc;
ReaderBucket readerBucket = context.getReaderBucket(readerId);
assert readerBucket != null;
setPartitionRow(partitionRows, i, readerBucket);
fetchRows[i].cells = readerBucket.get(docId);
assert !readerBucket.fetchRequired() || fetchRows[i].cells != null;
setPartitionRow(partitionRows, j, readerBucket);
fetchRows[j].cells = readerBucket.get(docId);
assert !readerBucket.fetchRequired() || fetchRows[j].cells != null;
}
Result result = downstream.setNextRow(outputRow);
switch (result) {
case CONTINUE:
continue;
case PAUSE:
throw new UnsupportedOperationException("FetchProjector doesn't support pause");
final int startIdx = i + 1;
downstream.pauseProcessed(new ResumeHandle() {
@Override
public void resume(boolean async) {
ExecutorResumeHandle.resume(resultExecutor, new Runnable() {
@Override
public void run() {
sendToDownstream(isLast, startIdx);
}
}, async);
}
});
return;
case STOP:
break loop;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.crate.analyze.symbol.FetchReference;
import io.crate.analyze.symbol.InputColumn;
Expand All @@ -45,6 +46,7 @@
import io.crate.planner.node.fetch.FetchSource;
import io.crate.test.integration.CrateUnitTest;
import io.crate.testing.CollectingRowReceiver;
import io.crate.testing.RowSender;
import io.crate.testing.TestingHelpers;
import io.crate.types.LongType;
import org.junit.After;
Expand All @@ -62,24 +64,13 @@ public class FetchProjectorTest extends CrateUnitTest {

private static final TableIdent USER_TABLE_IDENT = new TableIdent(Schemas.DEFAULT_SCHEMA_NAME, "users");
private ExecutorService executorService;
private FetchOperation fetchOperation;

@Before
public void before() throws Exception {
executorService = Executors.newFixedThreadPool(2);
}

@After
public void after() throws Exception {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}

@Test
public void testMultipleFetchRequests() throws Throwable {
CollectingRowReceiver rowReceiver = new CollectingRowReceiver();

// dummy FetchOperation that returns buckets for each reader-id where each row contains a column that is the same as the docId
FetchOperation fetchOperation = new FetchOperation() {
fetchOperation = new FetchOperation() {
@Override
public ListenableFuture<IntObjectMap<? extends Bucket>> fetch(String nodeId, IntObjectMap<? extends IntContainer> toFetch, boolean closeContext) {
IntObjectHashMap<Bucket> readerToBuckets = new IntObjectHashMap<>();
Expand All @@ -93,6 +84,38 @@ public ListenableFuture<IntObjectMap<? extends Bucket>> fetch(String nodeId, Int
return Futures.<IntObjectMap<? extends Bucket>>immediateFuture(readerToBuckets);
}
};
}

@After
public void after() throws Exception {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}

@Test
public void testPauseSupport() throws Exception {
final CollectingRowReceiver rowReceiver = CollectingRowReceiver.withPauseAfter(2);
int fetchSize = 4;
FetchProjector fetchProjector = prepareFetchProjector(fetchSize, rowReceiver, fetchOperation);
final RowSender rowSender = new RowSender(RowSender.rowRange(0, 10), fetchProjector, MoreExecutors.directExecutor());
rowSender.run();

assertBusy(new Runnable() {
@Override
public void run() {
assertThat(rowSender.numPauses(), is(1));
assertThat(rowReceiver.rows.size(), is(2));
}
});
rowReceiver.resumeUpstream(false);
assertThat(TestingHelpers.printedTable(rowReceiver.result()),
is("0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n"));
}

@Test
public void testMultipleFetchRequests() throws Throwable {
CollectingRowReceiver rowReceiver = new CollectingRowReceiver();


int fetchSize = 3;
FetchProjector fetchProjector = prepareFetchProjector(fetchSize, rowReceiver, fetchOperation);
Expand Down

0 comments on commit 165005c

Please sign in to comment.