Skip to content

Commit

Permalink
DRILL-6478: enhance debug logs for batch sizing
Browse files Browse the repository at this point in the history
  • Loading branch information
ppadma committed Jun 11, 2018
1 parent 9dceecf commit 2b39afd
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,7 @@ public void update() {
// i.e. all rows fit within memory budget.
setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
}
logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());

updateIncomingStats();
}
Expand All @@ -171,6 +169,8 @@ public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext
// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);

logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}

@Override
Expand Down Expand Up @@ -263,7 +263,7 @@ protected IterOutcome doWork() {
flattenMemoryManager.updateOutgoingStats(outputRecords);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}

// Get the final outcome based on hasRemainder since that will determine if all the incoming records were
Expand Down Expand Up @@ -516,14 +516,15 @@ private void updateStats() {
stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords());

logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());

logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());

logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,8 @@ protected boolean prefetchFirstBatchFromBothSides() {
batchMemoryManager.update(LEFT_INDEX, 0);
batchMemoryManager.update(RIGHT_INDEX, 0, true);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming left:\n {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
logger.debug("BATCH_STATS, incoming right:\n {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
}
logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));

if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
state = BatchState.STOP;
Expand Down Expand Up @@ -358,7 +356,7 @@ public IterOutcome innerNext() {

batchMemoryManager.updateOutgoingStats(outputRecords);
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}

/* We are here because of one the following
Expand Down Expand Up @@ -890,6 +888,7 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right);
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}

/**
Expand Down Expand Up @@ -1004,21 +1003,19 @@ public void close() {

updateMetrics();

logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
}

this.cleanup();
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,18 @@ private class MergeJoinMemoryManager extends JoinBatchMemoryManager {
@Override
public void update(int inputIndex) {
status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition()));
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
}
logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
}
}

protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
super(popConfig, context, true, left, right);

// Instantiate the batch memory manager
final int outputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new MergeJoinMemoryManager(outputBatchSize, left, right);
final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right);

logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);

if (popConfig.getConditions().size() == 0) {
throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions");
Expand Down Expand Up @@ -271,7 +271,7 @@ private void setRecordCountInContainer() {
}

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}

batchMemoryManager.updateOutgoingStats(getRecordCount());
Expand All @@ -281,21 +281,23 @@ private void setRecordCountInContainer() {
public void close() {
updateBatchMemoryManagerStats();

logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
}

super.close();
leftIterator.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchMemoryManager;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
Expand Down Expand Up @@ -74,6 +76,7 @@ public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, Fragment
// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize);
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}

@Override
Expand Down Expand Up @@ -168,6 +171,10 @@ private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema) t
batchStatus.recordsProcessed += recordCount;
batchMemoryManager.updateOutgoingStats(recordCount);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}

if (callBack.getSchemaChangedAndReset()) {
return IterOutcome.OK_NEW_SCHEMA;
} else {
Expand Down Expand Up @@ -361,6 +368,8 @@ public Pair<IterOutcome, BatchStatusWrappper> next() {
if (topStatus.prefetched) {
topStatus.prefetched = false;
batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
return Pair.of(topStatus.outcome, topStatus);
} else {

Expand All @@ -378,6 +387,8 @@ public Pair<IterOutcome, BatchStatusWrappper> next() {
topStatus.recordsProcessed = 0;
topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount();
batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
return Pair.of(outcome, topStatus);
case OUT_OF_MEMORY:
case STOP:
Expand Down Expand Up @@ -409,6 +420,20 @@ public void remove() {
public void close() {
super.close();
updateBatchMemoryManagerStats();

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
}
}

}

0 comments on commit 2b39afd

Please sign in to comment.