Skip to content

Commit

Permalink
DRILL-2083: Fix bug in merging receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
StevenMPhillips authored and vkorukanti committed Apr 27, 2015
1 parent 6878bfd commit 57a96d2
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 33 deletions.
Expand Up @@ -112,7 +112,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private int[] batchOffsets; private int[] batchOffsets;
private PriorityQueue <Node> pqueue; private PriorityQueue <Node> pqueue;
private RawFragmentBatch emptyBatch = null; private RawFragmentBatch emptyBatch = null;
private RawFragmentBatch[] tempBatchHolder; // private RawFragmentBatch[] tempBatchHolder;
private long[] inputCounts;
private long[] outputCounts;


public static enum Metric implements MetricDef{ public static enum Metric implements MetricDef{
BYTES_RECEIVED, BYTES_RECEIVED,
Expand All @@ -135,15 +137,19 @@ public MergingRecordBatch(final FragmentContext context,
this.outgoingContainer = new VectorContainer(oContext); this.outgoingContainer = new VectorContainer(oContext);
this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
this.config = config; this.config = config;
this.inputCounts = new long[config.getNumSenders()];
this.outputCounts = new long[config.getNumSenders()];
} }


private RawFragmentBatch getNext(final RawFragmentBatchProvider provider) throws IOException{ private RawFragmentBatch getNext(final int providerIndex) throws IOException{
stats.startWait(); stats.startWait();
final RawFragmentBatchProvider provider = fragProviders[providerIndex];
try { try {
final RawFragmentBatch b = provider.getNext(); final RawFragmentBatch b = provider.getNext();
if (b != null) { if (b != null) {
stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount()); stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false); stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount();
} }
return b; return b;
} finally { } finally {
Expand Down Expand Up @@ -186,9 +192,8 @@ public IterOutcome innerNext() {
rawBatch = tempBatchHolder[p]; rawBatch = tempBatchHolder[p];
tempBatchHolder[p] = null; tempBatchHolder[p] = null;
} else { } else {
rawBatch = getNext(provider); rawBatch = getNext(p);
} }
p++;
if (rawBatch == null && !context.shouldContinue()) { if (rawBatch == null && !context.shouldContinue()) {
return IterOutcome.STOP; return IterOutcome.STOP;
} }
Expand All @@ -204,7 +209,7 @@ public IterOutcome innerNext() {
emptyBatch = rawBatch; emptyBatch = rawBatch;
} }
try { try {
while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) { while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
; ;
} }
if (rawBatch == null && !context.shouldContinue()) { if (rawBatch == null && !context.shouldContinue()) {
Expand All @@ -220,6 +225,7 @@ public IterOutcome innerNext() {
rawBatches.add(emptyBatch); rawBatches.add(emptyBatch);
} }
} }
p++;
} }


// allocate the incoming record batch loaders // allocate the incoming record batch loaders
Expand Down Expand Up @@ -304,7 +310,7 @@ public int compare(final Node node1, final Node node2) {
for (int b = 0; b < senderCount; ++b) { for (int b = 0; b < senderCount; ++b) {
while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) { while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
try { try {
final RawFragmentBatch batch = getNext(fragProviders[b]); final RawFragmentBatch batch = getNext(b);
incomingBatches[b] = batch; incomingBatches[b] = batch;
if (batch != null) { if (batch != null) {
batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody()); batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
Expand Down Expand Up @@ -335,7 +341,6 @@ public int compare(final Node node1, final Node node2) {
if (!copyRecordToOutgoingBatch(node)) { if (!copyRecordToOutgoingBatch(node)) {
logger.debug("Outgoing vectors space is full; breaking"); logger.debug("Outgoing vectors space is full; breaking");
prevBatchWasFull = true; prevBatchWasFull = true;
break;
} }
pqueue.poll(); pqueue.poll();


Expand All @@ -349,11 +354,13 @@ public int compare(final Node node1, final Node node2) {
// reached the end of an incoming record batch // reached the end of an incoming record batch
RawFragmentBatch nextBatch = null; RawFragmentBatch nextBatch = null;
try { try {
nextBatch = getNext(fragProviders[node.batchId]); nextBatch = getNext(node.batchId);


while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) { while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
nextBatch = getNext(fragProviders[node.batchId]); nextBatch = getNext(node.batchId);
} }
assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
: String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
if (nextBatch == null && !context.shouldContinue()) { if (nextBatch == null && !context.shouldContinue()) {
return IterOutcome.STOP; return IterOutcome.STOP;
} }
Expand Down Expand Up @@ -383,7 +390,11 @@ public int compare(final Node node1, final Node node2) {


// this batch is empty; since the pqueue no longer references this batch, it will be // this batch is empty; since the pqueue no longer references this batch, it will be
// ignored in subsequent iterations. // ignored in subsequent iterations.
continue; if (prevBatchWasFull) {
break;
} else {
continue;
}
} }


final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef(); final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
Expand Down Expand Up @@ -447,7 +458,7 @@ public void buildSchema() {
state = BatchState.DONE; state = BatchState.DONE;
return; return;
} }
final RawFragmentBatch batch = getNext(fragProviders[i]); final RawFragmentBatch batch = getNext(i);
if (batch.getHeader().getDef().getFieldCount() == 0) { if (batch.getHeader().getDef().getFieldCount() == 0) {
i++; i++;
continue; continue;
Expand Down Expand Up @@ -661,6 +672,8 @@ private void generateComparisons(final ClassGenerator g, final VectorAccessible
* @param node Reference to the next record to copy from the incoming batches * @param node Reference to the next record to copy from the incoming batches
*/ */
private boolean copyRecordToOutgoingBatch(final Node node) { private boolean copyRecordToOutgoingBatch(final Node node) {
assert ++outputCounts[node.batchId] <= inputCounts[node.batchId]
: String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
final int inIndex = (node.batchId << 16) + node.valueIndex; final int inIndex = (node.batchId << 16) + node.valueIndex;
merger.doCopy(inIndex, outgoingPosition); merger.doCopy(inIndex, outgoingPosition);
outgoingPosition++; outgoingPosition++;
Expand Down
Expand Up @@ -66,21 +66,21 @@ public void twoBitTwoExchange() throws Exception {
} }
for (Object cell : row) { for (Object cell : row) {
if (cell == null) { if (cell == null) {
System.out.print("<null> "); // System.out.print("<null> ");
continue; continue;
} }
int len = cell.toString().length(); int len = cell.toString().length();
System.out.print(cell + " "); // System.out.print(cell + " ");
for (int i = 0; i < (30 - len); ++i) { for (int i = 0; i < (30 - len); ++i) {
System.out.print(" "); // System.out.print(" ");
} }
} }
System.out.println(); // System.out.println();
} }
b.release(); b.release();
batchLoader.clear(); batchLoader.clear();
} }
assertEquals(200, count); assertEquals(200000, count);
} }
} }


Expand Down Expand Up @@ -122,17 +122,17 @@ public void testMultipleProvidersMixedSizes() throws Exception {
} }
for (Object cell : row) { for (Object cell : row) {
int len = cell.toString().length(); int len = cell.toString().length();
System.out.print(cell + " "); // System.out.print(cell + " ");
for (int i = 0; i < (30 - len); ++i) { for (int i = 0; i < (30 - len); ++i) {
System.out.print(" "); // System.out.print(" ");
} }
} }
System.out.println(); // System.out.println();
} }
b.release(); b.release();
batchLoader.clear(); batchLoader.clear();
} }
assertEquals(400, count); assertEquals(400000, count);
} }
} }


Expand Down Expand Up @@ -163,21 +163,21 @@ public void handleEmptyBatch() throws Exception {
} }
for (Object cell : row) { for (Object cell : row) {
if (cell == null) { if (cell == null) {
System.out.print("<null> "); // System.out.print("<null> ");
continue; continue;
} }
int len = cell.toString().length(); int len = cell.toString().length();
System.out.print(cell + " "); // System.out.print(cell + " ");
for (int i = 0; i < (30 - len); ++i) { for (int i = 0; i < (30 - len); ++i) {
System.out.print(" "); // System.out.print(" ");
} }
} }
System.out.println(); // System.out.println();
} }
b.release(); b.release();
batchLoader.clear(); batchLoader.clear();
} }
assertEquals(100, count); assertEquals(100000, count);
} }
} }


Expand Down
Expand Up @@ -12,7 +12,7 @@
pop:"mock-scan", pop:"mock-scan",
url: "http://apache.org", url: "http://apache.org",
entries:[ entries:[
{records: 100, types: [ {records: 100000, types: [
{name: "blue", type: "BIGINT", mode: "OPTIONAL"}, {name: "blue", type: "BIGINT", mode: "OPTIONAL"},
{name: "red", type: "BIGINT", mode: "OPTIONAL"}, {name: "red", type: "BIGINT", mode: "OPTIONAL"},
{name: "green", type: "BIGINT", mode: "OPTIONAL"} {name: "green", type: "BIGINT", mode: "OPTIONAL"}
Expand Down
Expand Up @@ -12,12 +12,12 @@
pop:"mock-scan", pop:"mock-scan",
url: "http://apache.org", url: "http://apache.org",
entries:[ entries:[
{records: 100, types: [ {records: 100000, types: [
{name: "blue", type: "BIGINT", mode: "OPTIONAL"}, {name: "blue", type: "BIGINT", mode: "OPTIONAL"},
{name: "red", type: "BIGINT", mode: "OPTIONAL"}, {name: "red", type: "BIGINT", mode: "OPTIONAL"},
{name: "green", type: "BIGINT", mode: "OPTIONAL"} {name: "green", type: "BIGINT", mode: "OPTIONAL"}
]}, ]},
{records: 100, types: [ {records: 100000, types: [
{name: "blue", type: "BIGINT", mode: "OPTIONAL"}, {name: "blue", type: "BIGINT", mode: "OPTIONAL"},
{name: "red", type: "BIGINT", mode: "OPTIONAL"}, {name: "red", type: "BIGINT", mode: "OPTIONAL"},
{name: "green", type: "BIGINT", mode: "OPTIONAL"} {name: "green", type: "BIGINT", mode: "OPTIONAL"}
Expand Down
Expand Up @@ -12,27 +12,27 @@
pop:"mock-scan", pop:"mock-scan",
url: "http://apache.org", url: "http://apache.org",
entries:[ entries:[
{records: 100, types: [ {records: 100000, types: [
{name: "blue", type: "BIGINT", mode: "REQUIRED"}, {name: "blue", type: "BIGINT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"}, {name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "BIGINT", mode: "REQUIRED"} {name: "green", type: "BIGINT", mode: "REQUIRED"}
]}, ]},
{records: 90, types: [ {records: 90000, types: [
{name: "blue", type: "BIGINT", mode: "REQUIRED"}, {name: "blue", type: "BIGINT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"}, {name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "BIGINT", mode: "REQUIRED"} {name: "green", type: "BIGINT", mode: "REQUIRED"}
]}, ]},
{records: 80, types: [ {records: 80000, types: [
{name: "blue", type: "BIGINT", mode: "REQUIRED"}, {name: "blue", type: "BIGINT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"}, {name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "BIGINT", mode: "REQUIRED"} {name: "green", type: "BIGINT", mode: "REQUIRED"}
]}, ]},
{records: 70, types: [ {records: 70000, types: [
{name: "blue", type: "BIGINT", mode: "REQUIRED"}, {name: "blue", type: "BIGINT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"}, {name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "BIGINT", mode: "REQUIRED"} {name: "green", type: "BIGINT", mode: "REQUIRED"}
]}, ]},
{records: 60, types: [ {records: 60000, types: [
{name: "blue", type: "BIGINT", mode: "REQUIRED"}, {name: "blue", type: "BIGINT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"}, {name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "BIGINT", mode: "REQUIRED"} {name: "green", type: "BIGINT", mode: "REQUIRED"}
Expand Down

0 comments on commit 57a96d2

Please sign in to comment.