Skip to content

Commit

Permalink
fixup! fixup! fixup! fixup! use pause / resume in DistributingDownstream
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Aug 18, 2015
1 parent ce7c176 commit 22044a7
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ private void pause() {
}
}

private void resume(boolean async) {
private void resume() {
for (RowUpstream upstream : upstreams) {
upstream.resume(async);
upstream.resume(true);
}
}

Expand Down Expand Up @@ -251,13 +251,13 @@ private void onResponse(boolean needMore) {
}
}
if (resume) {
resume(activeUpstreams > 1);
resume();
}
} else {
if (finishedDownstreams.incrementAndGet() == downstreams.length) {
gatherMoreRows = false;
}
resume(activeUpstreams > 1);
resume();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.crate.core.collections.Bucket;
import io.crate.core.collections.Row1;
import io.crate.test.integration.CrateUnitTest;
import io.crate.testing.TestingHelpers;
import io.crate.types.DataTypes;
import org.junit.Test;

Expand All @@ -46,8 +47,10 @@ public void testRowsAreDistributedByModulo() throws Exception {

final Bucket rowsD1 = buckets[0];
assertThat(rowsD1.size(), is(2));
assertThat(TestingHelpers.printedTable(rowsD1), is("2\n4\n"));

final Bucket rowsD2 = buckets[1];
assertThat(rowsD2.size(), is(2));
assertThat(TestingHelpers.printedTable(rowsD2), is("1\n3\n"));
}
}

0 comments on commit 22044a7

Please sign in to comment.