Skip to content

Commit

Permalink
DRILL-6296: Add operator metrics for batch sizing for merge join
Browse files Browse the repository at this point in the history
closes #1181
  • Loading branch information
ppadma authored and arina-ielchiieva committed Apr 6, 2018
1 parent 77f5e90 commit da24113
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 158 deletions.
Expand Up @@ -23,6 +23,7 @@
import org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec; import org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec;
import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch; import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
import org.apache.drill.exec.physical.impl.join.HashJoinBatch; import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
import org.apache.drill.exec.physical.impl.join.MergeJoinBatch;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch; import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class OperatorMetricRegistry {
register(CoreOperatorType.EXTERNAL_SORT_VALUE, ExternalSortBatch.Metric.class); register(CoreOperatorType.EXTERNAL_SORT_VALUE, ExternalSortBatch.Metric.class);
register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, ParquetRecordReader.Metric.class); register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, ParquetRecordReader.Metric.class);
register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class); register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class);
register(CoreOperatorType.MERGE_JOIN_VALUE, MergeJoinBatch.Metric.class);
} }


private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) { private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) {
Expand Down
Expand Up @@ -45,7 +45,7 @@
import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager; import org.apache.drill.exec.record.RecordBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TransferPair;
Expand Down Expand Up @@ -104,19 +104,19 @@ public enum Metric implements MetricDef {
INPUT_BATCH_COUNT, INPUT_BATCH_COUNT,
AVG_INPUT_BATCH_BYTES, AVG_INPUT_BATCH_BYTES,
AVG_INPUT_ROW_BYTES, AVG_INPUT_ROW_BYTES,
TOTAL_INPUT_RECORDS, INPUT_RECORD_COUNT,
OUTPUT_BATCH_COUNT, OUTPUT_BATCH_COUNT,
AVG_OUTPUT_BATCH_BYTES, AVG_OUTPUT_BATCH_BYTES,
AVG_OUTPUT_ROW_BYTES, AVG_OUTPUT_ROW_BYTES,
TOTAL_OUTPUT_RECORDS; OUTPUT_RECORD_COUNT;


@Override @Override
public int metricId() { public int metricId() {
return ordinal(); return ordinal();
} }
} }


private class FlattenMemoryManager extends AbstractRecordBatchMemoryManager { private class FlattenMemoryManager extends RecordBatchMemoryManager {


@Override @Override
public void update() { public void update() {
Expand Down Expand Up @@ -152,9 +152,10 @@ public void update() {
// i.e. all rows fit within memory budget. // i.e. all rows fit within memory budget.
setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));


logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," + logger.debug("incoming batch size : {}", getRecordBatchSizer());
"avgOutgoingRowWidth : {}, outputRowCount : {}", getRecordBatchSizer(), outputBatchSize,
avgOutgoingRowWidth, getOutputRowCount()); logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output rowCount : {}",
outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());


updateIncomingStats(); updateIncomingStats();
} }
Expand Down Expand Up @@ -496,11 +497,20 @@ private void updateStats() {
stats.setLongStat(Metric.INPUT_BATCH_COUNT, flattenMemoryManager.getNumIncomingBatches()); stats.setLongStat(Metric.INPUT_BATCH_COUNT, flattenMemoryManager.getNumIncomingBatches());
stats.setLongStat(Metric.AVG_INPUT_BATCH_BYTES, flattenMemoryManager.getAvgInputBatchSize()); stats.setLongStat(Metric.AVG_INPUT_BATCH_BYTES, flattenMemoryManager.getAvgInputBatchSize());
stats.setLongStat(Metric.AVG_INPUT_ROW_BYTES, flattenMemoryManager.getAvgInputRowWidth()); stats.setLongStat(Metric.AVG_INPUT_ROW_BYTES, flattenMemoryManager.getAvgInputRowWidth());
stats.setLongStat(Metric.TOTAL_INPUT_RECORDS, flattenMemoryManager.getTotalInputRecords()); stats.setLongStat(Metric.INPUT_RECORD_COUNT, flattenMemoryManager.getTotalInputRecords());
stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, flattenMemoryManager.getNumOutgoingBatches()); stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, flattenMemoryManager.getNumOutgoingBatches());
stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, flattenMemoryManager.getAvgOutputBatchSize()); stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, flattenMemoryManager.getAvgOutputBatchSize());
stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth()); stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(Metric.TOTAL_OUTPUT_RECORDS, flattenMemoryManager.getTotalOutputRecords()); stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords());

logger.debug("input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());

logger.debug("output: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());

} }


@Override @Override
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.common.Comparator; import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer;
Expand All @@ -56,7 +57,7 @@
import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager; import org.apache.drill.exec.record.RecordBatchMemoryManager;
import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector;


Expand Down Expand Up @@ -109,12 +110,37 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private static final String LEFT_INPUT = "LEFT INPUT"; private static final String LEFT_INPUT = "LEFT INPUT";
private static final String RIGHT_INPUT = "RIGHT INPUT"; private static final String RIGHT_INPUT = "RIGHT INPUT";


private class MergeJoinMemoryManager extends AbstractRecordBatchMemoryManager { private static final int numInputs = 2;
private static final int LEFT_INDEX = 0;
private static final int RIGHT_INDEX = 1;

public enum Metric implements MetricDef {
LEFT_INPUT_BATCH_COUNT,
LEFT_AVG_INPUT_BATCH_BYTES,
LEFT_AVG_INPUT_ROW_BYTES,
LEFT_INPUT_RECORD_COUNT,
RIGHT_INPUT_BATCH_COUNT,
RIGHT_AVG_INPUT_BATCH_BYTES,
RIGHT_AVG_INPUT_ROW_BYTES,
RIGHT_INPUT_RECORD_COUNT,
OUTPUT_BATCH_COUNT,
AVG_OUTPUT_BATCH_BYTES,
AVG_OUTPUT_ROW_BYTES,
OUTPUT_RECORD_COUNT;

@Override
public int metricId() {
return ordinal();
}
}

private class MergeJoinMemoryManager extends RecordBatchMemoryManager {
private int leftRowWidth; private int leftRowWidth;
private int rightRowWidth; private int rightRowWidth;


private RecordBatchSizer leftSizer; public MergeJoinMemoryManager() {
private RecordBatchSizer rightSizer; super(numInputs);
}


/** /**
* mergejoin operates on one record at a time from the left and right batches * mergejoin operates on one record at a time from the left and right batches
Expand All @@ -127,17 +153,20 @@ private class MergeJoinMemoryManager extends AbstractRecordBatchMemoryManager {
@Override @Override
public void update(int inputIndex) { public void update(int inputIndex) {
switch(inputIndex) { switch(inputIndex) {
case 0: case LEFT_INDEX:
leftSizer = new RecordBatchSizer(left); setRecordBatchSizer(inputIndex, new RecordBatchSizer(left));
leftRowWidth = leftSizer.netRowWidth(); leftRowWidth = getRecordBatchSizer(inputIndex).netRowWidth();
logger.debug("left incoming batch size : {}", getRecordBatchSizer(inputIndex));
break; break;
case 1: case RIGHT_INDEX:
rightSizer = new RecordBatchSizer(right); setRecordBatchSizer(inputIndex, new RecordBatchSizer(right));
rightRowWidth = rightSizer.netRowWidth(); rightRowWidth = getRecordBatchSizer(inputIndex).netRowWidth();
logger.debug("right incoming batch size : {}", getRecordBatchSizer(inputIndex));
default: default:
break; break;
} }


updateIncomingStats(inputIndex);
final int newOutgoingRowWidth = leftRowWidth + rightRowWidth; final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;


// If outgoing row width is 0, just return. This is possible for empty batches or // If outgoing row width is 0, just return. This is possible for empty batches or
Expand All @@ -153,16 +182,22 @@ public void update(int inputIndex) {
// calculate memory used so far based on previous outgoing row width and how many rows we already processed. // calculate memory used so far based on previous outgoing row width and how many rows we already processed.
final long memoryUsed = status.getOutPosition() * getOutgoingRowWidth(); final long memoryUsed = status.getOutPosition() * getOutgoingRowWidth();
// This is the remaining memory. // This is the remaining memory.
final long remainingMemory = Math.max(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR - memoryUsed, 0); final long remainingMemory = Math.max(outputBatchSize - memoryUsed, 0);
// These are number of rows we can fit in remaining memory based on new outgoing row width. // These are number of rows we can fit in remaining memory based on new outgoing row width.
final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth); final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);


status.setTargetOutputRowCount(status.getOutPosition() + numOutputRowsRemaining); status.setTargetOutputRowCount(adjustOutputRowCount(status.getOutPosition() + numOutputRowsRemaining));
setOutgoingRowWidth(newOutgoingRowWidth); setOutgoingRowWidth(newOutgoingRowWidth);

logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output rowCount : {}",
outputBatchSize, getOutgoingRowWidth(), getOutputRowCount());
} }


@Override @Override
public RecordBatchSizer.ColumnSize getColumnSize(String name) { public RecordBatchSizer.ColumnSize getColumnSize(String name) {
RecordBatchSizer leftSizer = getRecordBatchSizer(LEFT_INDEX);
RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX);

if (leftSizer != null && leftSizer.getColumn(name) != null) { if (leftSizer != null && leftSizer.getColumn(name) != null) {
return leftSizer.getColumn(name); return leftSizer.getColumn(name);
} }
Expand Down Expand Up @@ -324,10 +359,12 @@ private void setRecordCountInContainer() {
Preconditions.checkArgument(!vw.isHyper()); Preconditions.checkArgument(!vw.isHyper());
vw.getValueVector().getMutator().setValueCount(getRecordCount()); vw.getValueVector().getMutator().setValueCount(getRecordCount());
} }
mergeJoinMemoryManager.updateOutgoingStats(getRecordCount());
} }


@Override @Override
public void close() { public void close() {
updateStats();
super.close(); super.close();
leftIterator.close(); leftIterator.close();
rightIterator.close(); rightIterator.close();
Expand Down Expand Up @@ -573,4 +610,33 @@ private LogicalExpression materializeExpression(LogicalExpression expression, It
} }
return materializedExpr; return materializedExpr;
} }

private void updateStats() {
stats.setLongStat(MergeJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, mergeJoinMemoryManager.getNumIncomingBatches(LEFT_INDEX));
stats.setLongStat(MergeJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, mergeJoinMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
stats.setLongStat(MergeJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, mergeJoinMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT, mergeJoinMemoryManager.getTotalInputRecords(LEFT_INDEX));

stats.setLongStat(MergeJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, mergeJoinMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
stats.setLongStat(MergeJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, mergeJoinMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
stats.setLongStat(MergeJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, mergeJoinMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT, mergeJoinMemoryManager.getTotalInputRecords(RIGHT_INDEX));

stats.setLongStat(MergeJoinBatch.Metric.OUTPUT_BATCH_COUNT, mergeJoinMemoryManager.getNumOutgoingBatches());
stats.setLongStat(MergeJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, mergeJoinMemoryManager.getAvgOutputBatchSize());
stats.setLongStat(MergeJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, mergeJoinMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(MergeJoinBatch.Metric.OUTPUT_RECORD_COUNT, mergeJoinMemoryManager.getTotalOutputRecords());

logger.debug("left input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}",
mergeJoinMemoryManager.getNumIncomingBatches(LEFT_INDEX), mergeJoinMemoryManager.getAvgInputBatchSize(LEFT_INDEX),
mergeJoinMemoryManager.getAvgInputRowWidth(LEFT_INDEX), mergeJoinMemoryManager.getTotalInputRecords(LEFT_INDEX));

logger.debug("right input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}",
mergeJoinMemoryManager.getNumIncomingBatches(RIGHT_INDEX), mergeJoinMemoryManager.getAvgInputBatchSize(RIGHT_INDEX),
mergeJoinMemoryManager.getAvgInputRowWidth(RIGHT_INDEX), mergeJoinMemoryManager.getTotalInputRecords(RIGHT_INDEX));

logger.debug("output: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}",
mergeJoinMemoryManager.getNumOutgoingBatches(), mergeJoinMemoryManager.getAvgOutputBatchSize(),
mergeJoinMemoryManager.getAvgOutputRowWidth(), mergeJoinMemoryManager.getTotalOutputRecords());
}
} }

This file was deleted.

0 comments on commit da24113

Please sign in to comment.