Skip to content

Commit

Permalink
DRILL-6478:enhance debug logs for batch sizing
Browse files Browse the repository at this point in the history
  • Loading branch information
ppadma committed Jun 8, 2018
1 parent 9dceecf commit f9fde24
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 9 deletions.
Expand Up @@ -158,7 +158,7 @@ public void update() {
setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
}

updateIncomingStats();
Expand All @@ -171,6 +171,10 @@ public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext
// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}
}

@Override
Expand Down Expand Up @@ -263,7 +267,7 @@ protected IterOutcome doWork() {
flattenMemoryManager.updateOutgoingStats(outputRecords);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}

// Get the final outcome based on hasRemainder since that will determine if all the incoming records were
Expand Down
Expand Up @@ -243,8 +243,8 @@ protected boolean prefetchFirstBatchFromBothSides() {
batchMemoryManager.update(RIGHT_INDEX, 0, true);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming left:\n {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
logger.debug("BATCH_STATS, incoming right:\n {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
}

if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
Expand Down Expand Up @@ -358,7 +358,7 @@ public IterOutcome innerNext() {

batchMemoryManager.updateOutgoingStats(outputRecords);
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}

/* We are here because of one the following
Expand Down Expand Up @@ -890,6 +890,9 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right);
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}
}

/**
Expand Down
Expand Up @@ -123,7 +123,7 @@ private class MergeJoinMemoryManager extends JoinBatchMemoryManager {
public void update(int inputIndex) {
status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition()));
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
}
}
}
Expand All @@ -132,8 +132,12 @@ protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, Record
super(popConfig, context, true, left, right);

// Instantiate the batch memory manager
final int outputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new MergeJoinMemoryManager(outputBatchSize, left, right);
final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}

if (popConfig.getConditions().size() == 0) {
throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions");
Expand Down Expand Up @@ -271,7 +275,7 @@ private void setRecordCountInContainer() {
}

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}

batchMemoryManager.updateOutgoingStats(getRecordCount());
Expand Down
Expand Up @@ -39,9 +39,11 @@
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchMemoryManager;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
Expand Down Expand Up @@ -74,6 +76,9 @@ public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, Fragment
// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize);
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}
}

@Override
Expand Down Expand Up @@ -168,6 +173,10 @@ private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema) t
batchStatus.recordsProcessed += recordCount;
batchMemoryManager.updateOutgoingStats(recordCount);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}

if (callBack.getSchemaChangedAndReset()) {
return IterOutcome.OK_NEW_SCHEMA;
} else {
Expand Down Expand Up @@ -361,6 +370,10 @@ public Pair<IterOutcome, BatchStatusWrappper> next() {
if (topStatus.prefetched) {
topStatus.prefetched = false;
batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
}
return Pair.of(topStatus.outcome, topStatus);
} else {

Expand All @@ -378,6 +391,10 @@ public Pair<IterOutcome, BatchStatusWrappper> next() {
topStatus.recordsProcessed = 0;
topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount();
batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
}
return Pair.of(outcome, topStatus);
case OUT_OF_MEMORY:
case STOP:
Expand Down Expand Up @@ -409,6 +426,22 @@ public void remove() {
public void close() {
super.close();
updateBatchMemoryManagerStats();

logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
}

}

0 comments on commit f9fde24

Please sign in to comment.