Skip to content

Commit

Permalink
fixup! Add pause support to the fetchProjector
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Jul 11, 2016
1 parent 0c4ab7c commit 9db22a5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public ExecutorResumeHandle(Executor executor, Runnable runnable) {

@Override
public void resume(boolean async) {
resume(executor, runnable, async);
}

public static void resume(Executor executor, Runnable runnable, boolean async) {
if (async) {
try {
executor.execute(runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@
import io.crate.operation.Input;
import io.crate.operation.InputRow;
import io.crate.operation.fetch.FetchRowInputSymbolVisitor;
import io.crate.operation.projectors.AbstractProjector;
import io.crate.operation.projectors.RepeatHandle;
import io.crate.operation.projectors.Requirement;
import io.crate.operation.projectors.ResumeHandle;
import io.crate.operation.projectors.*;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -278,7 +275,12 @@ private void sendToDownstream(final boolean isLast, final int rowStartIdx) {
downstream.pauseProcessed(new ResumeHandle() {
@Override
public void resume(boolean async) {
sendToDownstream(isLast, startIdx);
ExecutorResumeHandle.resume(resultExecutor, new Runnable() {
@Override
public void run() {
sendToDownstream(isLast, startIdx);
}
}, async);
}
});
return;
Expand Down

0 comments on commit 9db22a5

Please sign in to comment.