Skip to content

Commit

Permalink
DRILL-6478: enhance debug logs for batch sizing
Browse files Browse the repository at this point in the history
closes #1310
  • Loading branch information
ppadma authored and ilooner committed Jun 12, 2018
1 parent ded643d commit cbcb59d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 52 deletions.
Expand Up @@ -157,9 +157,7 @@ public void update() {
// i.e. all rows fit within memory budget. // i.e. all rows fit within memory budget.
setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));


if (logger.isDebugEnabled()) { logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
}


updateIncomingStats(); updateIncomingStats();
} }
Expand All @@ -171,6 +169,8 @@ public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext
// get the output batch size from config. // get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize); flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);

logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
} }


@Override @Override
Expand Down Expand Up @@ -263,7 +263,7 @@ protected IterOutcome doWork() {
flattenMemoryManager.updateOutgoingStats(outputRecords); flattenMemoryManager.updateOutgoingStats(outputRecords);


if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
} }


// Get the final outcome based on hasRemainder since that will determine if all the incoming records were // Get the final outcome based on hasRemainder since that will determine if all the incoming records were
Expand Down Expand Up @@ -516,14 +516,15 @@ private void updateStats() {
stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth()); stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords()); stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords());


logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", if (logger.isDebugEnabled()) {
flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(), logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords()); flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),

flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());
logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());


logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
}
} }


@Override @Override
Expand Down
Expand Up @@ -242,10 +242,8 @@ protected boolean prefetchFirstBatchFromBothSides() {
batchMemoryManager.update(LEFT_INDEX, 0); batchMemoryManager.update(LEFT_INDEX, 0);
batchMemoryManager.update(RIGHT_INDEX, 0, true); batchMemoryManager.update(RIGHT_INDEX, 0, true);


if (logger.isDebugEnabled()) { logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
logger.debug("BATCH_STATS, incoming left:\n {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX)); logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
logger.debug("BATCH_STATS, incoming right:\n {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
}


if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) { if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
state = BatchState.STOP; state = BatchState.STOP;
Expand Down Expand Up @@ -358,7 +356,7 @@ public IterOutcome innerNext() {


batchMemoryManager.updateOutgoingStats(outputRecords); batchMemoryManager.updateOutgoingStats(outputRecords);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
} }


/* We are here because of one the following /* We are here because of one the following
Expand Down Expand Up @@ -890,6 +888,7 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
// get the output batch size from config. // get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right); batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right);
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
} }


/** /**
Expand Down Expand Up @@ -1004,21 +1003,19 @@ public void close() {


updateMetrics(); updateMetrics();


logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", if (logger.isDebugEnabled()) {
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),

batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", }
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());


this.cleanup(); this.cleanup();
super.close(); super.close();
Expand Down
Expand Up @@ -122,18 +122,18 @@ private class MergeJoinMemoryManager extends JoinBatchMemoryManager {
@Override @Override
public void update(int inputIndex) { public void update(int inputIndex) {
status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition())); status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition()));
if (logger.isDebugEnabled()) { logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
}
} }
} }


protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
super(popConfig, context, true, left, right); super(popConfig, context, true, left, right);


// Instantiate the batch memory manager // Instantiate the batch memory manager
final int outputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new MergeJoinMemoryManager(outputBatchSize, left, right); batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right);

logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);


if (popConfig.getConditions().size() == 0) { if (popConfig.getConditions().size() == 0) {
throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions"); throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions");
Expand Down Expand Up @@ -271,7 +271,7 @@ private void setRecordCountInContainer() {
} }


if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
} }


batchMemoryManager.updateOutgoingStats(getRecordCount()); batchMemoryManager.updateOutgoingStats(getRecordCount());
Expand All @@ -281,21 +281,23 @@ private void setRecordCountInContainer() {
public void close() { public void close() {
updateBatchMemoryManagerStats(); updateBatchMemoryManagerStats();


logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", if (logger.isDebugEnabled()) {
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),

batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),

batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
}


super.close(); super.close();
leftIterator.close(); leftIterator.close();
Expand Down
Expand Up @@ -39,9 +39,11 @@
import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchMemoryManager; import org.apache.drill.exec.record.RecordBatchMemoryManager;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorAccessibleUtilities;
Expand Down Expand Up @@ -74,6 +76,7 @@ public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, Fragment
// get the output batch size from config. // get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize); batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize);
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
} }


@Override @Override
Expand Down Expand Up @@ -168,6 +171,10 @@ private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema) t
batchStatus.recordsProcessed += recordCount; batchStatus.recordsProcessed += recordCount;
batchMemoryManager.updateOutgoingStats(recordCount); batchMemoryManager.updateOutgoingStats(recordCount);


if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}

if (callBack.getSchemaChangedAndReset()) { if (callBack.getSchemaChangedAndReset()) {
return IterOutcome.OK_NEW_SCHEMA; return IterOutcome.OK_NEW_SCHEMA;
} else { } else {
Expand Down Expand Up @@ -361,6 +368,8 @@ public Pair<IterOutcome, BatchStatusWrappper> next() {
if (topStatus.prefetched) { if (topStatus.prefetched) {
topStatus.prefetched = false; topStatus.prefetched = false;
batchMemoryManager.update(topStatus.batch, topStatus.inputIndex); batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
return Pair.of(topStatus.outcome, topStatus); return Pair.of(topStatus.outcome, topStatus);
} else { } else {


Expand All @@ -378,6 +387,8 @@ public Pair<IterOutcome, BatchStatusWrappper> next() {
topStatus.recordsProcessed = 0; topStatus.recordsProcessed = 0;
topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount(); topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount();
batchMemoryManager.update(topStatus.batch, topStatus.inputIndex); batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
return Pair.of(outcome, topStatus); return Pair.of(outcome, topStatus);
case OUT_OF_MEMORY: case OUT_OF_MEMORY:
case STOP: case STOP:
Expand Down Expand Up @@ -409,6 +420,20 @@ public void remove() {
public void close() { public void close() {
super.close(); super.close();
updateBatchMemoryManagerStats(); updateBatchMemoryManagerStats();

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
}
} }


} }

0 comments on commit cbcb59d

Please sign in to comment.